算法原理
1、(固定)时间窗限流算法
- 特点:系统自动选定一个时间窗口的起始零点,然后按照固定长度将时间轴划分为若干定长的时间窗口。
- 原理:判断请求到达的时间点所在的时间窗口当前统计的数据是否超出阈值。
- 缺点:跨窗口范围的数据超出阈值问题。
2、 滑动时间窗限流算法
- 特点:没有划分固定的时间窗起点与终点,而是将每一次请求的到来时间点作为统计时间窗的终点,起点则是终点向前推时间窗长度的时间点。
- 原理:判断 请求到来时间点 – 窗口长度 范围内数据是否超出阈值。
- 缺点:重复统计、性能问题。
3、(改进)滑动时间窗限流算法
- 特点:固定时间窗+滑动时间窗结合,时间窗分为若干“样本窗口”。
- 原理:每个样本窗口会统计一次数据并记录下来。当一个请求到达时,会统计出当前请求时间点所在样本窗口中的流量数据,然后加上时间窗中其它样本窗口的统计数据,判断是否超出阈值。
核心源码
一、数据统计
核心类:
- StatisticSlot – 统计入口
- DefaultNode – 实际入口
- StatisticNode – 统计节点
- ArrayMetric – 使用数组保存数据的计量器类
- LeapArray – 样本窗口数组(环性数组)
- BucketLeapArray – 重置样本窗口
- WindowWrap – 样本窗口(泛型T为MetricBucket)
- MetricBucket – 统计数据封装类(多维度,维度类型在MetricEvent枚举)
1、StatisticSlot – 统计入口
用于记录、统计不同纬度的 runtime 指标监控信息;(做实时统计)
- 线程数:内部维护一个LongAdder来进行当前线程数的统计,每进一个请求+1,每释放一个请求-1。
- QPS:通过滑动时间窗统计请求数量是否超过阈值。
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// 向后传递:调用SlotChain中后续的所有Slot,完成所有规则检测(执行过程中可能回抛出异常:如,BlockException)
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 前面所有规则检测通过:对DefaultNode添加线程数和qps(通过的请求数量:滑动时间窗)
node.increaseThreadNum();
node.addPassRequest(count);
... ...
}
复制代码
2、DefaultNode – 实际入口
统计资源当前入口和全局数据
@Override
public void addPassRequest(int count) {
// 增加当前入口defaultNode统计数据(调用父类StatisticNode)
super.addPassRequest(count);
// 增加当前资源的clusterNode的全局统计数据(背后也是调用父类StatisticNode)
this.clusterNode.addPassRequest(count);
}
复制代码
3、StatisticNode – 统计节点
滑动计数器按 秒/分 分别增加统计数据
// 定义一个使用数组保存数据的计量器:样本窗口数-2、时间窗默认值-1000ms
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(
SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL
);
@Override
public void addPassRequest(int count) {
// 滑动计数器:秒/分 增加统计数据
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
复制代码
4、ArrayMetric – 使用数组保存数据的计量器类
按秒/分统计数据并记录到当前样本窗口
@Override
public void addPass(int count) {
// 获取当前时间点所在的样本窗口
WindowWrap<MetricBucket> wrap = data.currentWindow();
// 将当前请求计数量添加到当前样本窗口统计数据中
wrap.value().addPass(count);
}
复制代码
5、LeapArray – 样本窗口数组(环性数组)
获取当前时间点所在的样本窗口(LeapArray采用了一个环性数组的数据结构,和一致性hash算法的图类似)
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 计算:当前时间所在样本窗口id,即在计算数组leapArray中的索引( (timeMillis / windowLengthInMs) % array.length() )
int idx = calculateTimeIdx(timeMillis);
// 计算:当前样本窗口开始时间点(timeMillis - timeMillis % windowLengthInMs)
long windowStart = calculateWindowStart(timeMillis);
while (true) {
// 获取当前时间所在的样本窗口
WindowWrap<T> old = array.get(idx);
if (old == null) {
// 创建时间窗
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) { // cas方式
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
// 当前样本窗口起始时间=计算出的:说明是同一个样本窗口
return old;
} else if (windowStart > old.windowStart()) {
// 计算出的样本窗口已经过时(环形:已经下一圈):重置原时间窗口(替换老的样本窗口)
if (updateLock.tryLock()) {
try {
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 一般不会出现(时间不会倒流):除非人为修改系统时钟
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
复制代码
6、BucketLeapArray – 重置样本窗口
计算出的样本窗口已经过时(环形:已经下一圈):重置原时间窗口(替换老的样本窗口)
@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
// Update the start time and reset value.
// 更新窗口起始时间:仅把数据替换
w.resetTo(startTime);
// 将每个维度统计数据清零
w.value().reset();
return w;
}
复制代码
7、MetricBucket – 统计数据封装类
pass维度增加
public void addPass(int n) {
// pass维度增加
add(MetricEvent.PASS, n);
}
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
复制代码
二、数据使用(qps例子)
核心类:
- DefaultController – 入口
- StatisticNode – 实际入口
- ArrayMetric – 使用数组保存数据的计量器类
- LeapArray – 样本窗口数组(环性数组)
1、DefaultController – 入口
获取当前时间窗已统计数据
private int avgUsedTokens(Node node) {
if (node == null) {
// 未做统计工作:返回0
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
复制代码
2、StatisticNode – 实际入口
获取通过qps数量
@Override
public double passQps() {
// 时间窗场景:当前时间窗中统计的通过的请求数量/时间窗长度
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}
复制代码
3、ArrayMetric – 使用数组保存数据的计量器类
汇总pass数据:所有样本窗口
@Override
public long pass() {
// 更新array中当前时间点所在的样本窗口实例中的数据
data.currentWindow();
long pass = 0;
// 获取:当前时间窗口中的所有样本窗口统计的value,记录到result中
List<MetricBucket> list = data.values();
// 汇总pass数据
for (MetricBucket window : list) {
pass += window.pass();
}
return pass;
}
复制代码
4、LeapArray – 样本窗口数组(环性数组)
汇总样本窗口实例:要过滤过时的
public List<T> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<T>();
}
int size = array.length();
List<T> result = new ArrayList<T>(size);
// 遍历array中每个样本窗口实例,并汇总result
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
// 若当前遍历实例:空/过时
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
continue;
}
result.add(windowWrap.value());
}
return result;
}
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
// 当前时间与当前样本窗口时间差 > 窗口时间 : 说明过时
return time - windowWrap.windowStart() > intervalInMs;
}
复制代码
参考资料
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END