欢迎搜索关注同名微信公众号【Coder的技术之路】,后台提供历史文章整理版PDF下载,欢迎收藏,欢迎讨论
背景
我们的支付场景下,要求消费的业务消息绝不能丢失,且能充分利用高规格的服务器的性能,比如用线程池对业务消息进行快速处理。有同学可能没太理解这个问题有啥不好处理,让我一步步分析下。
MQ的优势和缺点
MQ是我们在应对高并发场景最常用的一种措施,它可以帮我们对业务解耦、对流程异步化以及削峰填谷的妙用。
但是,由于引入了这一额外的中间件,也增加了系统的复杂度和不稳定因素。
消息可靠性的应对
消息的可靠性保证需要从消息流转的每个环节进行保障,比如生产端的事务型消息,broker的实时刷盘持久化,消费端的手动ACK 。
这里,我们对生产端和存储端的保障措施不作讨论,重点关注消费端的手动ACK机制。
手动ACK的问题
手动ACK可以保证消息一定被消费,但是需要确保手动ACK的顺序和消息顺序一致,为什么?
消息队列之所以性能高处理快,是因为采用了文件顺序读写方式,系统在拉取消息进行消费时,是按顺序文件的offset进行拉取的,如果commit offset的顺序错乱,会使得服务端的消息状态错乱,比如消息重发。
因此,如果我们在本地启动了线程池,对消息进行拉取处理,由于各线程的处理速度不一定一致,所以无法保证各线程处理完之后对各自消息的ACK操作是顺序的,怎么办,难道只能同步拉消费取然后ACK么。
解决方案
最不济,可以提交一批任务,批量等待统一提交。不过总觉得不优雅。
某次看JUC中的AQS的时候,启发了我。
我们平时用的类似CountDownLauch这些并发工具类,不也是处理的多线程协作的问题么。
我们的场景完全没有AQS复杂,借鉴它的思路,应该是没有问题的。
-
创建双端队列,队列节点中需要维护自身处理状态state,和对应msg的offset。
-
服务从消息中心拉取消息,在提交本地线程池执行之前,先入队列。
-
消息消费完之后,通知队列中对应的节点,更新状态为完成。
-
队列头被更新后出队列,提交offset,并判断新的队列头的状态,直到遇到state是未完成的head时阻塞。
undefined
方案解析
该方案可以有效利用本地线程的资源,并行的处理,并通过队列和异步通知机制保证最终commit offset时有序。
在最差情况下(即head节点对应的msg最后一个被处理完),相当于等待一批线程处理完成后统一提交。除此之外等待性能都要更优。
异步通知的实现
public class MSGFuture {
/*全局变量,存放msg对应的future对象*/
private static final Map<Long, MSGFuture> FUTURES = new ConcurrentHashMap<Long, MSGFuture>();
/*全局不变唯一标识*/
private final long id;
/*最长等待时间*/
private final int timeout;
/*并发锁*/
private final Lock lock = new ReentrantLock();
/*通知条件*/
private final Condition done = lock.newCondition();
/*开始时间*/
private final long start = System.currentTimeMillis();
/*业务结果*/
private volatile Object response;
}
复制代码
//构造函数
public MSGFuture(Request request, int timeout) {
/*全局自增ID*/
this.id = request.getrId();
/*超时时间*/
this.timeout = timeout > 0 ? timeout : 1000;
/*放入全局变量*/
FUTURES.put(id, this);
}
复制代码
//业务处理结果更新
public static void received(long id, Object response) {
MSGFuture future = FUTURES.remove(id);
if (future != null) {
future.doReceived(response);
} else {
logger.warn("response return timeout,id:"+id);
}
}
复制代码
//结果更新,通知等待条件
private void doReceived(Object res) {
lock.lock();
try {
response = res;
done.signal();
} finally {
lock.unlock();
}
}
复制代码
//异步等待获取结果
public Object get(int timeout) throws TimeoutException {
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException();
}
}
return returnFromResponse();
}
复制代码
总结
看到这里,有同学会说,这个和AQS有啥关系呀~
其实,只是处理思路的一种借鉴,比如state状态,比如锁机制和通知等待。既然都是多线程任务协调,那总有相似之处。
总之一句话,别说背八股文没用,多多了解会有大帮助~