要散布阳光到别人心里,先要自己心里有阳光。
背景
Saas系统与Ckmro系统之间随着业务交互数量的增加,待轮询数据倍数增长,单线程已经无法满足业务的推进。
- 需要解决的问题:
- 单线程处理速度缓慢
- 待轮询数据权重分化
- 处理过程的异常监听
速度缓慢:待轮询数据的处理速度由单线程来执行过于缓慢,需要多线程共同处理。权重分化:待轮询数据权重如果相同,活跃度不高的数据频繁访问Ckrmo造成资源、时间、宽带浪费。需要根据访问频率来进行权重区分。监听:在数据处理过程中难免会有不定因素的产生,需要远程监听线程的状态并进行动态管理。
业务需求:
技术选型
采用自定义线程池+LRU来完成整个轮询设计。
-
ReentrantLock与StampedLock的比较
- 整体环境是读多写少且非公平,ReentrantLock的弊端显现出来,写线程会发生线程饥饿。假设有一个ReentrantReadWriteLock读写锁,当前是读锁被占有的,且有一个申请写锁的线程被阻塞,因为读锁是不互斥的, 这个时候大量的读操来读取数据,这个时候就会造成那一条申请写锁的线程会一直被阻塞,这就造成了写线程的饥饿。
- 读多写少,StampedLock提供乐观读,可以提高整体并发。
-
LRU数据结构
- 采用HashMap+链表。实现LRU需要一个key数组作为定位,一个双向链表储存值。Java提供有序字典类LinkedHashMap同样可作为LRU的实现,但在多线程的环境下我们要处理线程安全同样为了并发高性能需要自己在操作底层数据结构,故排除。
-
集合的选择
- Collection提供很多实现类,要根据特点选用。如ArrayList底层实现为数组,查询快、增删慢。LinkedList底层实现为链表,查询慢、增删快,HashSet由Hash表来维护唯一性,具有很好的存取和查找性能。
设计实现
待轮询数据结构
要重写equals与hashCode方法才能配合后续的Set集合
public class RARLoopModel implements RARLinkNodeModel {
private String id;
private String value;
private int type;
private int status;
private Date createTime;
// ...
public boolean equals(Object o) {
}
public int hashCode() {
}
}
复制代码
配合业务实现Lru
LRU(last least use)即最近最少使用,选择最近最久未使用的数据予以淘汰。
设计原则是:如果一个数据在最近一段时间没有被访问到,那么在将来它被访问的可能性也很小。也就是说,当限定的空间已存满数据时,应当把最久没有被访问到的数据淘汰。
LRU的底层结构采用Map+双向链表实现,HashMap对于hash表的优化已经做的不错,无需在费力气去实现,双向链表保证节点操作的便利。
主要实现两个方法:get、put
线程安全与高性能:使用读写锁处理线程安全,get、put的操作几乎都要用到写,故全用写锁。锁的粒度自然就是整个双向链表。从ConcurrentHashMap源码我们可以引入分段锁的思想,以CPU数分割LRU capacity将双向链表分割成多条小链表,以key的hash来定位节点被分配到哪个小链表。这样的方式减小了锁粒度,提高并发性。
public class RARLruCache<K, V> implements LRUCache<K, V> {
private RARLruSegment<K, V>[] cacheSegments;
}
复制代码
轮询过程设计
结合业务,淘汰并不是真的淘汰,而是降低带轮询数据的权重。由图可知,按权重等级分为不同队列(HashSet与ArrayList),待轮询数据本身不存储权重值,以本地访问Ckmro获取判断结果为根据(Ckmro已做好安全措施,防止不法数据侵害系统 >_<),分配待轮询数据至不同队列。最后一个队列为后勤队列主要存储权重等级最低,出现异常的待轮询数据。其它队列为权重队列按不同权重等级区分。
这里有个小问题:随着重新分配后队列之间会有重复元素(不同队列都有指向相同数据的元素),以至于最后后勤队列的大小与allSet一样大。如想消除就在重新分配时进行过滤筛选,以高权重覆盖低权重为主进行队列之间的去重(其他线程会被长时间堵塞)。是否需要修改这个问题的依据是待轮询数据的大小。
每个队列都有一个缓存队列(LinkedList)用于数据的更迭,由其他队列产生。而当各队列数据重新分配数据的时候直接替换缓存队列,不需要在线程处理过程中操作队列浪费性能(空间换时间)、减少并发操作集合的风险(当待轮询数据超过1000时,一个缓存队列空间消耗达到1KB,缓存队列的收益开始减少,内存是珍贵的。测试方式是生成快照查看占用内存)。
内存是珍贵的,由于待轮询数据数量大致不会超过1000,故allSet由本地内存管理,如超过则分配至Redis,由缓存集群来缓解压力,不过就消耗CPU与IO宽带。
在代码的运行过程难免会出现系统震荡导致的异常,errorCache(LinkedList)作为异常导致数据执行失败的存放点,在重新分配后防止权重最低的队列。
private Set<RARLoopModel> allSet;
private List<RARLoopModel> levelOne;
private Iterator<RARLoopModel> levelOneIterator;
private List<RARLoopModel> levelOneCache;
private Set<RARLoopModel> levelTwo;
private Iterator<RARLoopModel> levelTwoIterator;
private List<RARLoopModel> levelTwoCache;
private List<RARLoopModel> errorCache;
复制代码
轮询流程
不同队列分配不同数量线程处理,一个定时线程重新分配队列的元素。
在权重队列与后勤队列中,多线程竞争获取元素并处理。权重的意义主要是解决待轮询数据访问频率与处理速度。
读元素:Stamped的优势就出来,乐观读提高不少的性能。只要保证获取元素时禁止添加元素即可。
if (this.levelOneIterator.hasNext()) {
// 在乐观锁获取不到的情况下返回null
long stamp = levelOneLock.tryOptimisticRead();
if (!levelOneLock.validate(stamp)) {
break;
}
if (this.levelOneIterator.hasNext()) {
RARLoopModel next = this.levelOneIterator.next();
return next;
}
}
复制代码
多线程之间相互合作处理问题,各个队列按照下图流程推进。关于judge为false时,数据降级,降级缓存添加元素可改善。
为什么 judge为false时,数据降级?队列的元素是由定时任务线程所分配,先清空队列元素在新增,队列元素=缓存元素+LRU Cache里的元素并去重。如果在每次judge为false不进行数据降级,那定时任务的工作压力就增大,不在是清空元素而是变成直接新增元素并去重,当队列数据量极大时,自然其他线程被堵塞的时间变长。
那这两个方案如何选择呢?根据数据量与实时性,其中数据量要优于实时性思考,因为当前业务对于实时性要求不是很高。现业务数据量较大且需要实时性,故采用judge为false数据降级。
try{
rarLoopModel = safeGetModel(1);
errorModel = rarLoopModel;
} catch (Exception e) {
logger.info("levelOneRunnable iterator " + e);
}
if (rarLoopModel == null) {
return;
}
boolean judge =
judge(rarLoopModel.getValue(), rarLoopModel.getType());
if (!judge) {
safeMoveToCache(rarLoopModel, 2);
return;
}
handler(rarLoopModel.getValue(), rarLoopModel.getType());
RARLoopModel model = levelOneLruCache.get(rarLoopModel.getId());
if (model != null) {
return;
}
// TODO put
RARLoopModel remove =
levelOneLruCache.put(rarLoopModel.getId(), rarLoopModel);
safeRemoveToCache(rarLoopModel, 2);
if (remove == null) {
return;
}
safeMoveToCache(remove, 2);
复制代码
细节处理
在Set集合要移除元素前思考是否需要先contain再remove,答案为否。经测试remove效率比contain快将近一倍,所以直接remove。
Stamped使用乐观锁的时候,要将变量重新拷贝一份在本地栈。
线程池设计
线程池使用面临的核心的问题在于:线程池的参数并不好配置。
一方面线程池的运行机制不是很好理解,配置合理需要强依赖开发人员的个人经验和知识;另一方面,线程池执行的情况和任务类型相关性较大,IO密集型和CPU密集型的任务运行起来的情况差异非常大。
在轮询正式上线使用的时候,关于数据的处理是异步的,轮询设计只执行数据轮询与数据权重的分配不参与业务上的活动。judge方法的高频 http 调用是IO密集的,消耗着网络资源,内存资源等。为了保证服务的安全,轮询是有休眠的,当队列数据一轮处理下来,队列对应线程进行sleep。
在设计初期一个队列是对应多个线程,大家从一个线程池获取线程资源,一旦服务上线是无法更改的。线上线程数量对队列元素数量是无感的,当队列元素数量猛增时依旧还是那几个线程来处理。
我需要的线程池有这几个特征:它不需要任务队列,因为线程数量跟轮询队列挂钩,一直while(true)的方式执行任务,一个任务对应一个线程,没有其他任务。我的线程处理完自己的队列的数据,本应该堵塞到数据重新分配到队列,但由于权重的问题,我希望线程在处理完自己队列数据后前往权重更高队列处理数据,管这个叫任务偷窃。
第一个问题:线城池动态修改配置
线程池Manager 监听来自配置服务的数据。
@Configuration
@ConditionalOnClass(value = org.springframework.data.redis.core.RedisTemplate.class)
public class RedisMessageListener implements RedisPubSubListener<String, String> {
public static final String CHANNEL = "";
@Bean
public RedisMessageListener RedisMessageListener() {
}
}
复制代码
JDK允许线程池使用方通过ThreadPoolExecutor的实例来动态设置线程池的核心策略,以setCorePoolSize为方法例,在运行期线程池使用方调用此方法设置corePoolSize之后,线程池会直接覆盖原来的corePoolSize值,并且基于当前值和原始值的比较结果采取不同的处理策略。
对于当前值小于当前工作线程数的情况,说明有多余的worker线程,此时会向当前idle的worker线程发起中断请求以实现回收,多余的worker在下次idle的时候也会被回收;对于当前值大于原始值且当前队列中有待执行任务,则线程池会创建新的worker线程来执行队列任务。
线程池构造参数有8个,结合业务我们只需要corePoolSize。worker线程的数量是根据队列元素数量范围而决定,为了系统安全提前设置好阈值。
public void refreshThreadPoolExecutor() {
// 从Redis获取需要更改的数据
...
dynamicThreadPoolProperties.getExecutors().forEach(executor -> {
ThreadPoolExecutor threadPoolExecutor =
threadPoolExecutorMap.get(executor.getThreadPoolName());
// 修改核心线程数
threadPoolExecutor.setCorePoolSize(executor.getCorePoolSize());
// 队列管理、阈值管理
...
});
}
复制代码
还需要保证几个细节:权限校验,只有应用开发负责人才能够修改应用的线程池参数。操作日志:可以查看线程池参数的修改记录,谁在什么时候修改了线程池参数、修改前的参数值是什么。操作监控:创建/修改和删除线程池都会通知到应用的开发负责人。
第二个问题:线程异常的通知
在线程池执行任务的生命周期添加监控能力,帮助开发人员了解线程池状态。
首先思考我们需要什么信息,在不侵害业务的情况下怎么获取?
线程池肯定会关心线程数量与堵塞队列的情况。
轮询获取:开启一个新的线程,while(true)获取堵塞队列的情况,放回线程异常信息等。缺点就是线程情况基本稳定,浪费CPU资源。
惰性获取:只需要获取实现ThreadPoolExecutor提供线程执行前后的方法
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
}
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
}
public interface ThreadPoolAlarmNotify {
void alarmNotify(AlarmMessage alarmMessage);
}
第二个无法实时获取堵塞队列的异常,但是我们的业务不需要堵塞业务所以采用这个方法。当然关于消息发送的目的地:选择钉钉。
第三个问题:业务使用线程池是为了获取并发性,有其他方案代替吗?
对于并发性我想协程会更好。协程是一种用户态的轻量级线程,其拥有自己的寄存器上下文和栈,对于上下文的切换所花销小于线程的开销。
优点:IO密集情况下,性能更佳。与多线程策略无冲突,可结合使用
缺点:Java中缺少成熟的应用。
关于协程池的探索,希望以后有时间试一试。
任务偷窃
我们无法判断时刻关注权重队列元素的数量,为了资源的再利用,即将休眠线程去帮忙一起处理更高权重待轮询数据是个不错的选择。
参考Fork/Join源码
private ForkJoinTask<?> scan(WorkQueue w, int r) {
WorkQueue[] ws;
int m;
if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
int ss = w.scanState; // initially non-negative
for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0; ; ) {
// 根据随机数r定位一个任务队列
if ((q = ws[k]) != null) { // 获取workQueue
if ((n = (b = q.base) - q.top) < 0 &&
(a = q.array) != null) { // non-empty
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
if ((t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i))) != null && // 取base位置任务
q.base == b) {
// 成功获取到任务
if (ss >= 0) {
if (U.compareAndSwapObject(a, i, t, null)) {
q.base = b + 1; // 更新base位
if (n < -1)
signalWork(ws, q); // 创建或唤醒工作线程来运行任务
return t;
}
} else if (oldSum == 0 && // try to activate
w.scanState < 0)
tryRelease(c = ctl, ws[m & (int) c], AC_UNIT); // 唤醒栈顶工作线程
}
// base位置任务为空或base位置偏移,随机移位重新扫描
if (ss < 0) // refresh
ss = w.scanState;
r ^= r << 1;
r ^= r >>> 3;
r ^= r << 10;
origin = k = r & m; // move and rescan
oldSum = checkSum = 0;
continue;
}
checkSum += b;
}
if ((k = (k + 1) & m) == origin) { // continue until stable
// 运行到这里说明已经扫描了全部的 workQueues,但并未扫描到任务
if ((ss >= 0 || (ss == (ss = w.scanState))) &&
oldSum == (oldSum = checkSum)) {
if (ss < 0 || w.qlock < 0) // already inactive
break;
// 对当前WorkQueue进行灭活操作
int ns = ss | INACTIVE; // try to inactivate
long nc = ((SP_MASK & ns) |
(UC_MASK & ((c = ctl) - AC_UNIT)));
w.stackPred = (int) c; // hold prev stack top
U.putInt(w, QSCANSTATE, ns);
if (U.compareAndSwapLong(this, CTL, c, nc))
ss = ns;
else
w.scanState = ss; // back out
}
checkSum = 0;
}
}
}
return null;
}
复制代码
代码有点长,我们在线程内业务实现。核心思想就是从权重最高队列开始安全获取数据,线程只可允许获取获取比自己权重高的线程。
public void stealTask() {
// 扫描所有队列判断是够还有next
for(){
// 获取元素并执行
// 判断自己队列是否有任务
}
// 无任务执行,休眠
}
复制代码
实践总结
轮询本身就是一件消耗资源的事情,在原来单线程轮询基础上,我们缩短了处理一个周期的速度。
面对业务中使用线程池遇到的问题,从支持并发性问题本身来思考有没有取代线程池的方案,到尝试着去追求线程池参数设置的合理性。但面对业界方案具体落地的复杂性、可维护性以及真实运行环境的不确定性,上面两个问题真不是我现在能实现的。
我能做的是回到线程池参数动态化方向上探索,得出一个且可以解决业务问题的方案,虽然本质上还是没有逃离使用线程池的范畴,但是在成本和收益之间,算是取得了一个很好的平衡。
面对业务中数据权重的分化,以项目业务为基础,结合数据支持才能更清晰明了、有条不紊。以解决问题为导向,在以什么时机划分,划分跨越幅度这些问题上要明确方向。在解决问题上必定拥有相对完善的理论知识体系,必须是能够逻辑自洽,这样的解决方式才更为合理和科学。