Sentinel:分布式系统的流量防卫兵(防御系统)
面向分布式服务架构的流量控制组件,主要以流量为切入点,从限流、流量整形、熔断降级、系统负载保护、热点防护等多个维度来帮助开发者保障微服务的稳定性。
Sentinel工作原理
1、架构
ProcessorSlotChain(核心骨架):将不同的 Slot 按照顺序串在一起(责任链模式),从而将不同的功能(限流、降级、系统保护)组合在一起。slot chain 其实可以分为两部分:统计数据构建部分(statistic)和判断部分(rule checking)。
系统会为每个资源创建一套SlotChain。
2、SPI机制
Sentinel槽链中Slot执行顺序是固定的,但并不是绝对的。Sentinel将ProcessorSlot作为SPI接口进行扩展,使得SlotChain具备了扩展能力。用户可以自定义Slot并编排Slot间的顺序。
代码实现
继承AbstractLinkedProcessorSlot,并设置@Spi(order)
@Spi(order = Constants.ORDER_FLOW_SLOT)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
... ...
}
复制代码
3、slot
工作流程: Sentinel工作主流程就包含在SphU.entry方法里,通过链式调用的方式,经过了建立树状结构,保存统计簇点,异常日志记录,实时数据统计,负载保护,权限认证,流量控制,熔断降级等Slot
调用链:
META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot
NodeSelectorSlot >>> ClusterBuilderSlot >>> LogSlot >>> StatisticSlot >>> ParamFlowSlot >>> SystemSlot >>> AuthoritySlot >>> FlowSlot >>> DegradeSlot
- NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
- ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;
- StatisticSlot 则用于记录、统计不同纬度的 runtime 指标监控信息;
- ParamFlowSlot(热点流控) 对应热点流控(针对资源的热点参数做流量控制)
- SystemSlot(系统规则) 则通过系统的状态,例如 load1 等,来控制总的入口流量;(针对当前服务做全局流量控制)
- AuthoritySlot(授权规则) 则根据配置的黑白名单和调用来源信息,来做黑白名单控制;(对访问资源的特定应用做授权处理)
- FlowSlot(流控规则) 则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;(针对资源流量控制)
- DegradeSlot(降级规则) 则通过统计信息以及预设的规则,来做熔断降级;(针对资源的调度情况来做降级处理)
4、Node
树形结构
类关系
Entry(资源):包含了资源名、curNode(当前统计节点)、originNode(来源统计节点)等信息。构造函数中会做调用链的变换,即将当前 Entry 接到传入 Context 的调用链路上。
Context(资源操作上下文):每个资源操作必须属于一个Context(通过ThreadLocal 传递)。若未指定,会创建默认name=sentinel_default_context。一个Context生命周期中可以包含多个
资源操作。Context生命周期中的最后一个资源在exit()时会清理该Conetxt,意味着这个Context生命周期结束。
节点 | 说明 | 维度 | 创建时机 | 备注 |
---|---|---|---|---|
ROOT | invocation tree(调用树)根 | 一个应用创建一个 | 系统启动 | |
EntranceNode | 入口节点,某个Context(一次请求)入口的所有调用数据 | context | ContextUtil.enter | context |
DefaultNode | 链路节点,用于统计调用链路上某个资源的数据 | resource * context | NodeSelectorSlot根据context创建 | set curNode to context |
ClusterNode | 簇点,用于统计每个资源全局的数据 | resource | ClusterBuilderSlot根据resourceName创建 | set clusterNode to defaultNode |
StatisticNode | 统计节点,包含秒/分钟级滑动窗口 | resource * origin | 来源节点根据origin创建 | set originNode to curEntry |
核心源码
Sentinel每种资源(Entry)有一个独有的Slot Chain,一起实现整体的流量控制。
核心类:
- SphU – Sentinel静态调用入口
- CtSph – 实际调用入口
- Context – 资源上下文,同一个资源可以包含在不同的context中
- CtEntry – 代表实际资源
- DefaultProcessorSlotChain – slot chain默认实现
- ProcessorSlot及子类 – 不同的slot实现
1、SentinelResourceAspect – 入口
Spring AOP:AspectJ 切入点 (以注解方式为例)
@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {
// 切入点为:@SentinelResource
@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
public void sentinelResourceAnnotationPointcut() {
}
// 环绕通知
@Around("sentinelResourceAnnotationPointcut()")
public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
... ...
String resourceName = getResourceName(annotation.value(), originMethod);
EntryType entryType = annotation.entryType();
int resourceType = annotation.resourceType();
Entry entry = null;
try {
// 【资源:工作主流程】要织入的、增强的功能
entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
// 调用目标方法
return pjp.proceed();
} catch (BlockException ex) {
return handleBlockException(pjp, annotation, ex);
} catch (Throwable ex) {
// No fallback function can handle the exception, so throw it out.
throw ex;
} finally {
if (entry != null) {
// 当前资源增强功能处理结束
entry.exit(1, pjp.getArgs());
}
}
}
}
复制代码
2、调用入口
2.1、SphU – 静态调用入口
主要做了五件事
- 1、将资源名称和流量类型进行包装
- 2、从当前线程得到context,如果之前没有创建context,则这里会创建一个context-name为sentinel_default_name、original为””的context
- 3、添加一个规则检查调用链,根据我们配置的规则一层一层进行检查,只要在某一个规则未通过就提前结束抛出该规则对应的异常
- 4、创建一个流量入口entry,它用来保存本次调用的信息,将context的curEntry进行指定
- 5、开始执行规则检查调用链
public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args)
throws BlockException {
// name:资源名, resourceType:资源类型,entryType:流量类型为入口还是出口(系统规则只针对入口流量),args:参数,后面做热点参数规则时用到
// batchCount:默认1个请求
return Env.sph.entryWithType(name, resourceType, trafficType, 1, args);
}
复制代码
2.2、CtSph – 实际调用入口
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
// 【关注点】当前线程持有的context(ThreadLocal):一个请求占用一个线程、一个线程绑定一个context
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// 当前系统中的context数量(请求数量)超出阈值:直接返回一个无需校验规则的资源对象
return new CtEntry(resourceWrapper, null, context);
}
if (context == null) {
// 创建默认名称(sentinel_default_context):放入ThreadLocal
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
// 全局开关-关闭:不进行规则检查,直接返回一个无需校验规则的资源对象
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
// 添加一个规则检查调用链
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
// 未找到chain(意味chain数量超出阈值):直接返回一个无需校验规则的资源对象
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
// 创建一个资源对象:一个流量入口,将context curEntry进行指定
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
// 开始规则检查
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
// 发生流控异常进行退出
e.exit(count, args);
// 将异常向上抛
throw e1;
} catch (Throwable e1) {
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
复制代码
3、Context – 资源上下文
同一个资源可以包含在不同的context中:统计资源的调用信息,如QPS、rt等信息
protected static Context trueEnter(String name, String origin) {
// 尝试:从当前线程上下文(ThreadLocal)中拿
Context context = contextHolder.get();
if (context == null) {
// 尝试:从缓存map的key=context-name,value=EntranceNode
Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
// 获取EntranceNode:context-name对应的DefaultNode
DefaultNode node = localCacheNameMap.get(name);
if (node == null) {
// 限制2000,也就是最多申明2000不同名称的上下文
if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
LOCK.lock();
try {
// 防止并发,再次检查
node = contextNameNodeMap.get(name);
if (node == null) {
if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
// 创建EntranceNode
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
// Add entrance node. 将新建的node添加到ROOT
Constants.ROOT.addChild(node);
// 将新建node写入缓存map
// 为了"防止迭代稳定性问题":iterate stable (对于共享集合的写操作:否则可能引发读操作读到脏数据)
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
newMap.putAll(contextNameNodeMap);
newMap.put(name, node);
contextNameNodeMap = newMap;
}
}
} finally {
LOCK.unlock();
}
}
}
// 将context-name与EntranceNode封装为context
context = new Context(node, name);
// 初始化context来源
context.setOrigin(origin);
// 将context写入ThreadLocal
contextHolder.set(context);
}
return context;
}
复制代码
4、DefaultProcessorSlotChain – slot chain默认实现
单向链表:默认创建一个节点,且两个指针(first、end)同时指向该节点
AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
super.fireExit(context, resourceWrapper, count, args);
}
};
AbstractLinkedProcessorSlot<?> end = first;
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
// 转换操作对象:从first节点转向下一个节点
first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
// end节点下一个节点:指定新的节点
end.setNext(protocolProcessor);
// end节点:设为新的节点
end = protocolProcessor;
}
复制代码
5、ProcessorSlot及子类 – 不同的slot实现
META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot
获取SlotChain:按顺序挨个判断
// CtSph
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
// 从缓存:获取当前资源的SlotChain(key=资源,value=其相关ProcessorSlotChain)
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
// 缓存无:创建并放入缓存
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry// 创建新的chain size limit.
// 缓存map的size >= chain数量最大阈值,直接返回null
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
// 【重点】创建新的chain
chain = SlotChainProvider.newSlotChain();
// 防止迭代稳定性问题
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
复制代码
5.1、NodeSelectorSlot
负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级
根据 context 创建 DefaultNode。
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
// 从缓存中获取DefaultNode
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
// 创建DefaultNode
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
// 将新建node添加到调用树
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
context.setCurNode(node);
// 【关注点】触发下一个节点
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
复制代码
5.2、StatisticSlot
记录、统计不同纬度的 runtime 指标监控信息;
注意:先调用SlotChain中后续的所有Slot,完成所有规则检测。然后再统计。
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// 向后传递:调用SlotChain中后续的所有Slot,完成所有规则检测(执行过程中可能回抛出异常:如,BlockException)
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 前面所有规则检测通过:对DefaultNode添加线程数和qps(通过的请求数量:涉及滑动窗口)
node.increaseThreadNum();
node.addPassRequest(count);
... ...
}
复制代码
5.3、FlowSlot
根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 获取当前时间窗已统计数据:node的ThreadNum或QPS
int curCount = avgUsedTokens(node);
if (curCount + acquireCount > count) {
// 设置当前流量为优先级和流控模式为QPS(prioritized=true):要等待
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
// 算出拿到当前令牌数的等待时间(ms)
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
// OccupyTimeoutProperty.getOccupyTimeout = 500ms
// 如果流量具有优先级,会获取未来的令牌数
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
// 添加占用未来的QPS,会调用OccupiableBucketLeapArray.addWaiting(long time, int acquireCount)
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
复制代码
5.5、DegradeSlot
通过统计信息以及预设的规则,来做熔断降级;
注意:只看到了状态从OPEN变为HALF_OPEN,HALF_OPEN变为OPEN,但没有看到状态如何从HALF_OPEN变为CLOSE的,它的变化过程是在正常执行完请求后,entry.exit()会调用DegradeSlot.exit()方法来改变状态
@Override
public boolean tryPass(Context context) {
// Template implementation.
// 正常通行
if (currentState.get() == State.CLOSED) {
return true;
}
// 尝试通行
if (currentState.get() == State.OPEN) {
// For half-open state we allow a request for probing.
// 下次时间窗时间点到了,且 open变为halfOpen (【注意】halfOpen:只能由系统从open转为)
return retryTimeoutArrived() && fromOpenToHalfOpen(context);
}
return false;
}
protected boolean fromOpenToHalfOpen(Context context) {
// 尝试将状态从OPEN设置为HALF_OPEN
if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
// 状态变化通知
notifyObservers(State.OPEN, State.HALF_OPEN, null);
Entry entry = context.getCurEntry();
// 在entry添加一个exitHandler entry.exit()时会调用
entry.whenTerminate(new BiConsumer<Context, Entry>() {
@Override
public void accept(Context context, Entry entry) {
// 如果有发生异常,重新将状态设置为OPEN 请求不同通过
if (entry.getBlockError() != null) {
// Fallback to OPEN due to detecting request is blocked
currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
}
}
});
return true;
}
return false;
}
复制代码
6、Sentinel Dashboard服务端源码
主要做三件事
- 使用spi加载com.alibaba.csp.sentinel.init.InitFunc的一些实现类;
- 将加载后的实现类进行排序;
- 调用这些实现类的初始化方法
- CommandCenterInitFunc:获取命令中心,做一些准备工作(注册dashboard接口处理器),然后创建一个socket监听8719端口(sentinel与客户端通信的端口号)
- HeartbeatSenderInitFunc:心跳相关任务初始化
- MetricCallbackInit:注册扩展的入口和出口回调类
- ParamFlowStatisticSlotCallbackInit:注册参数流入口和出口回调类