Sentinel 流控原理

我们在项目中添加 Spring Cloud Sentinel 依赖添加后 spring-cloud-starter-alibaba-sentinel 在 Spring-Boot 启动的过程中回去初始化 spring.factories 中的配置信息,如:SentinelWebAutoConfigurationSentinelAutoConfiguration 等配置文件来初始化

再讲代码之前我先声明一下我的版本号sentinel 1.8.0 。后续的所有内容均基于该版本进行

@ResoureSetinel 工作原理

配置流控规则我们最简单的方式就是通过 @ResoureSetinel 的方式来管理,该注解可以直接定义流控规则、降级规则。下面是一个简单的使用例子:

@SentinelResource(value = "ResOrderGet",
                  fallback = "fallback",
                  fallbackClass = SentinelResourceExceptionHandler.class,
                  blockHandler = "blockHandler",
                  blockHandlerClass = SentinelResourceExceptionHandler.class
                 )
@GetMapping("/order/get/{id}")
public CommonResult<StockModel> getStockDetails(@PathVariable Integer id) {
  StockModel stockModel = new StockModel();
  stockModel.setCode("STOCK==>1000");
  stockModel.setId(id);
  return CommonResult.success(stockModel);
}
复制代码

如果大家熟悉 Spring 相关的组件大家都可以想到,这里多半是通过Spring Aop. 的方式来拦截 getStockDetails 方法。我们先看看SentinelAutoConfiguration 配置文件,我们可以找到 SentinelResourceAspect Bean 的定义方法。

@Bean
@ConditionalOnMissingBean
public SentinelResourceAspect sentinelResourceAspect() {
   return new SentinelResourceAspect();
}
复制代码

让后我们再来看看 SentinelResourceAspect 具体是怎么处理的,源码如下:

// 定义 Pointcut
@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
public void sentinelResourceAnnotationPointcut() {
}

// Around 来对被标记 @SentinelResource 注解的方法进行处理
@Around("sentinelResourceAnnotationPointcut()")
public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
  Method originMethod = resolveMethod(pjp);

  // 获取注解信息
  SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);

  // 获取资源名称
  String resourceName = getResourceName(annotation.value(), originMethod);
  EntryType entryType = annotation.entryType();
  int resourceType = annotation.resourceType();
  Entry entry = null;
  try {
    // 执行 entry
    entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
    // 执行业务方法
    Object result = pjp.proceed();
    // 返回
    return result;
  } catch (BlockException ex) {
    // 处理 BlockException
    return handleBlockException(pjp, annotation, ex);
  } catch (Throwable ex) {
    Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
    // The ignore list will be checked first.
    if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
      throw ex;
    }
    if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
      traceException(ex);
      // 处理降级
      return handleFallback(pjp, annotation, ex);
    }

    // No fallback function can handle the exception, so throw it out.
    throw ex;
  }
}
复制代码

我们总结一下, @SentinelResource 的执行过程, 首先是通过 Aop 进行拦截,然后通过 SphU.entry 执行对应的流控规则,最后调用业务方法。如果触发流控规则首先处理流控异常 BlockException 然后在判断是否有服务降级的处理,如果有就调用 fallback 方法。通过 handleBlockExceptionhandleFallback 进行处理。

责任链模式处理流控

通过上面的梳理,我们知道对于流控的过程,核心处理方法就是 SphU.entry 。 在这个方法中其实主要就是初始化流控 Solt 和执行 Solt. 在这个过程中会对:簇点定义、流量控制、熔断降级、系统白名单等页面功能进行处理。

1. 初始化责任链

下面是初始化 Solt 的核心代码在 SphU.entryWithPriority

// 删减部分代码
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
  throws BlockException {

  // 初始化责任链
  ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

  Entry e = new CtEntry(resourceWrapper, chain, context);
  try {
    // 执行 entry
    chain.entry(context, resourceWrapper, null, count, prioritized, args);
  } catch (BlockException e1) {
    e.exit(count, args);
    // 异常抛出,让 SentinelResourceAspect.invokeResourceWithSentinel 统一处理
    throw e1;
  } catch (Throwable e1) {
    // This should not happen, unless there are errors existing in Sentinel internal.
    RecordLog.info("Sentinel unexpected exception", e1);
  }
  return e;
}
复制代码

通过 lookProcessChain 方法我逐步的查找,我们可以看到最终的责任链初始化类,默认是 DefaultSlotChainBuilder

public class DefaultSlotChainBuilder implements SlotChainBuilder {

    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();

        // Note: the instances of ProcessorSlot should be different, since they are not stateless.
        // 通过 SPI 去加载所有的  ProcessorSlot 实现,通过 Order 排序
        List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
        for (ProcessorSlot slot : sortedSlotList) {
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }
					  // 添加到 chain 尾部
            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }

        return chain;
    }
}
复制代码

2. 责任链的处理过程

我们可以通过断点的方式来查看在 sortedSlotList 集合中所有的 solt 顺序如下图所示

sentinel 流控规则初始化.png

我们可以通过如下的顺序进行逐个的简单的分析一下

  1. NodeSelectorSolt
  2. CusterBuilderSolt
  3. LogSlot
  4. StatisicSlot
  5. AuthoritySolt
  6. SystemSolts
  7. ParamFlowSolt
  8. FlowSolt
  9. DegradeSlot

对于 Sentinel 的 Slot 流控协作流程可以参考官方给出的文档, 如下图所示:

断路器工作协作原理.png

FlowSolt 流控

通过 NodeSelectorSolt、CusterBuilderSolt、StatisicSlot 等一系列的请求数据处理,在 FlowSolt 会进入流控规则,所有的 Solt 都会执行 entry 方法, 如下所示

// FlowSolt 的 entry 方法
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
  // 检查流量
  checkFlow(resourceWrapper, context, node, count, prioritized);

  fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
复制代码

在后续的流程中,会执进行判断具体的流控策略,默认是快速失败,会执行 DefaultController 方法。

// DefaultController
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
  // 当前资源的调用次数
  int curCount = avgUsedTokens(node);
  // 当前资源的调用次数 + 1 > 当前阈值
  if (curCount + acquireCount > count) {
    // 删减比分代码
    // 不通过
    return false;
  }
  // 通过
  return true;
}

private int avgUsedTokens(Node node) {
  if (node == null) {
    return DEFAULT_AVG_USED_TOKENS;
  }
  return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
复制代码

如果上面返回不通过会回到,那么会抛出 FlowException

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                      Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
  if (ruleProvider == null || resource == null) {
    return;
  }
  Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
  if (rules != null) {
    for (FlowRule rule : rules) {
      if (!canPassCheck(rule, context, node, count, prioritized)) {
        // 流控规则不通过,会抛出 FlowException
        throw new FlowException(rule.getLimitApp(), rule);
      }
    }
  }
}
复制代码

然后会在 StatisticSlot 中增加统计信息, 最后会抛出给 SentinelResourceAspect 进行处理,完成流控功能。我们再来看看这个异常信息,如果是BlockException 异常,会进入 handleBlockException 方法处理, 如果是其他的业务异常首先会判断是否有配置 fallback 处理如果有,就调用 handleFallback 没有就继续往外抛,至此完成流控功能

try {
  entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
  Object result = pjp.proceed();
  return result;
} catch (BlockException ex) {
  return handleBlockException(pjp, annotation, ex);
} catch (Throwable ex) {
  Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
  // The ignore list will be checked first.
  if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
    throw ex;
  }
  if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
    traceException(ex);
    return handleFallback(pjp, annotation, ex);
  }

  // No fallback function can handle the exception, so throw it out.
  throw ex;
}
复制代码

DegradeSlot 降级

断路器的作用是当某些资源一直出现故障时,将触发断路器。断路器不会继续访问已经发生故障的资源,而是拦截请求并返回故障信号。

Sentinel 在 DegradeSlot 这个 Slot 中实现了熔断降级的功能,它有三个状态 OPENHALF_OPENCLOSEDResponseTimeCircuitBreaker RT 响应时间维度来分析, 断路器工作的过程。下面是一个标准断路器的工作流程:

断路器工作协作原理.png

在 Sentinel 实现的源码过程如下图所示:

Sentinel 断路器源码处理过程.png

Sentinel 通过 Web 拦截器

Sentinel 在默认情况下, 不使用 @ResourceSentinel 注解实现流控的时候, Sentinel 通过拦截器进行流控实现的。 初始化类在 SentinelWebAutoConfiguration 它实现了 WebMvcConfigurer 接口,在 sentinelWebInterceptor 方法初始化 SentinelWebInterceptor 等 Bean。

@Bean
@ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled",
                       matchIfMissing = true)
public SentinelWebInterceptor sentinelWebInterceptor(
  SentinelWebMvcConfig sentinelWebMvcConfig) {
  return new SentinelWebInterceptor(sentinelWebMvcConfig);
}
复制代码

我们在 SentinelWebInterceptor 的核心方法 preHandle 中处理,这里面我们又可以看到 SphU.entry 熟悉的过程调用流控的责任链。由于逻辑都类似,此处不在多说。代码如下:

public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
  throws Exception {
  try {
    String resourceName = getResourceName(request);

    if (StringUtil.isEmpty(resourceName)) {
      return true;
    }

    if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {
      return true;
    }

    // Parse the request origin using registered origin parser.
    String origin = parseOrigin(request);
    String contextName = getContextName(request);
    ContextUtil.enter(contextName, origin);
    Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
    request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);
    return true;
  } catch (BlockException e) {
    try {
      handleBlockException(request, response, e);
    } finally {
      ContextUtil.exit();
    }
    return false;
  }
}
复制代码

参考文档

github.com/alibaba/Sen…

martinfowler.com/bliki/Circu…

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享