Apache ShenYu整合Sentinel源码分析
1.前言
Apache ShenYu是一款支持多语言、多协议(Dubbo,SpringCloud,gRPC,Motan,SofaTars, BRPC)、插件化设计、高度可动态化配置、高度可自主化开发的Java网关。内置丰富的插件支持,鉴权,限流,熔断,防火墙等等。流量配置动态化,性能极高。支持集群部署,支持 A/B Test,蓝绿发布等功能。
Sentinel是阿里巴巴开源的一款流量控制组件,主要用于流量控制、熔断降级、系统负载保护等。本文将介绍如何在Apache ShenYu中整合Sentinel的源码分析。
2.Apache ShenYu整合Sentinel源码分析
2.1 如何设置Sentinel加载资源的resourceName
ShenYu通过rule中获取selectorId和ruleId,拼接成selectorId_ruleId形式的resourceName,例如:150232_1233
public String getKey(final RuleData ruleData) {
return String.join("_", ruleData.getSelectorId(), ruleData.getId());
}
2.1 Sentinel加载ShenYu配置的限流、降级规则
ShenYu通过org.apache.shenyu.plugin.sentinel.handler.SentinelRuleHandle
接收插件所配置的限流、降级规则,然后通过org.apache.shenyu.plugin.sentinel.handler.SentinelRuleHandle#handlerRule
方法将规则加载到Sentinel中。
ShenYu会首先从Sentinel中获取流控、降级规则,然后将ShenYu配置的规则添加到Sentinel中,最后将所有规则加载到Sentinel中。
public class SentinelRuleHandle implements PluginDataHandler {
@Override
public void handlerRule(final RuleData ruleData) {
SentinelHandle sentinelHandle = GsonUtils.getInstance().fromJson(ruleData.getHandle(), SentinelHandle.class);
sentinelHandle.checkData(sentinelHandle);
String key = CacheKeyUtils.INST.getKey(ruleData);
// 获取当前资源的流控规则
List<FlowRule> flowRules = FlowRuleManager.getRules()
.stream()
.filter(r -> !r.getResource().equals(key))
.collect(Collectors.toList());
if (sentinelHandle.getFlowRuleEnable() == Constants.SENTINEL_ENABLE_FLOW_RULE) {
FlowRule rule = new FlowRule(key);
rule.setCount(sentinelHandle.getFlowRuleCount());
rule.setGrade(sentinelHandle.getFlowRuleGrade());
rule.setControlBehavior(sentinelHandle.getFlowRuleControlBehavior());
rule.setMaxQueueingTimeMs(sentinelHandle.getFlowRuleMaxQueueingTimeMs());
rule.setWarmUpPeriodSec(sentinelHandle.getFlowRuleWarmUpPeriodSec());
flowRules.add(rule);
}
// 加载流控规则
FlowRuleManager.loadRules(flowRules);
// 获取当前资源的降级规则
List<DegradeRule> degradeRules = DegradeRuleManager.getRules()
.stream()
.filter(r -> !r.getResource().equals(key))
.collect(Collectors.toList());
if (sentinelHandle.getDegradeRuleEnable() == Constants.SENTINEL_ENABLE_DEGRADE_RULE) {
DegradeRule rule = new DegradeRule(key);
rule.setCount(sentinelHandle.getDegradeRuleCount());
rule.setGrade(sentinelHandle.getDegradeRuleGrade());
rule.setTimeWindow(sentinelHandle.getDegradeRuleTimeWindow());
rule.setStatIntervalMs(sentinelHandle.getDegradeRuleStatIntervals() * 1000);
rule.setMinRequestAmount(sentinelHandle.getDegradeRuleMinRequestAmount());
rule.setSlowRatioThreshold(sentinelHandle.getDegradeRuleSlowRatioThreshold());
degradeRules.add(rule);
}
// 加载降级规则
DegradeRuleManager.loadRules(degradeRules);
}
}
2.2 ShenYu是如何获取应用错误并进行流控、降级的
2.2.1 在exchange注册http状态码消费者?何时消费?
这个http状态码的消费者主要作用就是根据响应码抛出SentinelFallbackException,而SentinelFallbackExceptio
n就是相当于收集来自原应用端的异常信息。
exchange.getAttributes().put(Constants.WATCHER_HTTP_STATUS, (Consumer<HttpStatus>) status -> {
if (status == null || !status.is2xxSuccessful()) {
throw new SentinelFallbackException(status == null ? HttpStatus.INTERNAL_SERVER_ERROR : status);
}
});
2.2.1 ShenYu是如何将应用资源交予Sentinel管理的
chain.execute(exchange).transform(new SentinelReactorTransformer<>(resourceName))
,其中的transform方法就是ShenYu将应用资源交予Sentinel的实现。
其中new SentinelReactorTransformer<>(resourceName)
相当于sentinel的Entry entry = SphU.entry("HelloWorld")
exchange.getAttributes().put(Constants.WATCHER_HTTP_STATUS, (Consumer<HttpStatus>) status -> {
if (status == null || !status.is2xxSuccessful()) {
throw new SentinelFallbackException(status == null ? HttpStatus.INTERNAL_SERVER_ERROR : status);
}
});
return chain.execute(exchange).transform(new SentinelReactorTransformer<>(resourceName)).onErrorResume(throwable ->
fallbackHandler.fallback(exchange, UriUtils.createUri(sentinelHandle.getFallbackUri()), throwable));