Skip to main content

Apache ShenYu整合Sentinel源码分析

moremind...About 2 minopensourceApache ShenYusentinel

1.前言

Apache ShenYu是一款支持多语言、多协议(Dubbo,SpringCloud,gRPC,Motan,SofaTars, BRPC)、插件化设计、高度可动态化配置、高度可自主化开发的Java网关。内置丰富的插件支持,鉴权,限流,熔断,防火墙等等。流量配置动态化,性能极高。支持集群部署,支持 A/B Test,蓝绿发布等功能。

Sentinel是阿里巴巴开源的一款流量控制组件,主要用于流量控制、熔断降级、系统负载保护等。本文将介绍如何在Apache ShenYu中整合Sentinel的源码分析。

2.Apache ShenYu整合Sentinel源码分析

2.1 如何设置Sentinel加载资源的resourceName

ShenYu通过rule中获取selectorId和ruleId,拼接成selectorId_ruleId形式的resourceName,例如:150232_1233

    public String getKey(final RuleData ruleData) {
        return String.join("_", ruleData.getSelectorId(), ruleData.getId());
    }

2.1 Sentinel加载ShenYu配置的限流、降级规则

ShenYu通过org.apache.shenyu.plugin.sentinel.handler.SentinelRuleHandle接收插件所配置的限流、降级规则,然后通过org.apache.shenyu.plugin.sentinel.handler.SentinelRuleHandle#handlerRule方法将规则加载到Sentinel中。

ShenYu会首先从Sentinel中获取流控、降级规则,然后将ShenYu配置的规则添加到Sentinel中,最后将所有规则加载到Sentinel中。

public class SentinelRuleHandle implements PluginDataHandler {
    
    @Override
    public void handlerRule(final RuleData ruleData) {
        SentinelHandle sentinelHandle = GsonUtils.getInstance().fromJson(ruleData.getHandle(), SentinelHandle.class);
        sentinelHandle.checkData(sentinelHandle);
        String key = CacheKeyUtils.INST.getKey(ruleData);
        // 获取当前资源的流控规则
        List<FlowRule> flowRules = FlowRuleManager.getRules()
                .stream()
                .filter(r -> !r.getResource().equals(key))
                .collect(Collectors.toList());
        if (sentinelHandle.getFlowRuleEnable() == Constants.SENTINEL_ENABLE_FLOW_RULE) {
            FlowRule rule = new FlowRule(key);
            rule.setCount(sentinelHandle.getFlowRuleCount());
            rule.setGrade(sentinelHandle.getFlowRuleGrade());
            rule.setControlBehavior(sentinelHandle.getFlowRuleControlBehavior());
            rule.setMaxQueueingTimeMs(sentinelHandle.getFlowRuleMaxQueueingTimeMs());
            rule.setWarmUpPeriodSec(sentinelHandle.getFlowRuleWarmUpPeriodSec());
            flowRules.add(rule);
        }
        // 加载流控规则
        FlowRuleManager.loadRules(flowRules);
        
        // 获取当前资源的降级规则
        List<DegradeRule> degradeRules = DegradeRuleManager.getRules()
                .stream()
                .filter(r -> !r.getResource().equals(key))
                .collect(Collectors.toList());
        if (sentinelHandle.getDegradeRuleEnable() == Constants.SENTINEL_ENABLE_DEGRADE_RULE) {
            DegradeRule rule = new DegradeRule(key);
            rule.setCount(sentinelHandle.getDegradeRuleCount());
            rule.setGrade(sentinelHandle.getDegradeRuleGrade());
            rule.setTimeWindow(sentinelHandle.getDegradeRuleTimeWindow());
            rule.setStatIntervalMs(sentinelHandle.getDegradeRuleStatIntervals() * 1000);
            rule.setMinRequestAmount(sentinelHandle.getDegradeRuleMinRequestAmount());
            rule.setSlowRatioThreshold(sentinelHandle.getDegradeRuleSlowRatioThreshold());
            degradeRules.add(rule);
        }
        // 加载降级规则
        DegradeRuleManager.loadRules(degradeRules);
    }
}

2.2 ShenYu是如何获取应用错误并进行流控、降级的

2.2.1 在exchange注册http状态码消费者?何时消费?

这个http状态码的消费者主要作用就是根据响应码抛出SentinelFallbackException,而SentinelFallbackException就是相当于收集来自原应用端的异常信息。

exchange.getAttributes().put(Constants.WATCHER_HTTP_STATUS, (Consumer<HttpStatus>) status -> {
    if (status == null || !status.is2xxSuccessful()) {
        throw new SentinelFallbackException(status == null ? HttpStatus.INTERNAL_SERVER_ERROR : status);
    }
});

2.2.1 ShenYu是如何将应用资源交予Sentinel管理的

chain.execute(exchange).transform(new SentinelReactorTransformer<>(resourceName)),其中的transform方法就是ShenYu将应用资源交予Sentinel的实现。
其中new SentinelReactorTransformer<>(resourceName)相当于sentinel的Entry entry = SphU.entry("HelloWorld")

exchange.getAttributes().put(Constants.WATCHER_HTTP_STATUS, (Consumer<HttpStatus>) status -> {
    if (status == null || !status.is2xxSuccessful()) {
        throw new SentinelFallbackException(status == null ? HttpStatus.INTERNAL_SERVER_ERROR : status);
    }
});
return chain.execute(exchange).transform(new SentinelReactorTransformer<>(resourceName)).onErrorResume(throwable ->
        fallbackHandler.fallback(exchange, UriUtils.createUri(sentinelHandle.getFallbackUri()), throwable));

评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.15.8