这一讲,我们主要来讲延迟消息。
这一次我们结合业务来讲。
业务背景
在电商中,下单后,有一些用户付款后,会主动退款。也有一些用户下单后,未付款。但是这部分未付款的订单,会占用着商品库存。
我们目前的电商App,下单后,会在订单表创建对应的订单数据。这些订单的状态,有一些是未付款的,但是未付款的订单占用着商品库存。为了让商品库存能正常恢复,我们现在的处理方案是:
- 启动一个定时任务,每30分钟,定时扫描一遍订单表
- 如果订单是已付款,则跳过,不处理
- 如果订单是未付款,但未超过30分钟,不处理
- 如果订单是未付款,且超过30分钟,就取消订单
(补充:取消订单,其实就是下单的逆向流程)
方案缺点
这个方案有什么缺点?
- 第一,每次定时任务去扫描全部订单,但是订单未付款且超时30分钟的只有一小部分。就是做很多无用功。
- 第二,如果订单表的数量超级超级大,这个时候,扫描的时间巨长,浪费cpu资源。
- 第三,这样子频繁查询数据库,给数据库造成很多不必要的压力。
解决方案
那针对上述的缺点,我们有没有好的解决方案:
- 第一,避免扫描全表
- 第二,谁没付款,就去取消谁,不要做多余的动作
- 第三,要保证近实时取消订单。(近实时:1s左右)
说了这么多,我摊牌了,不装了,就是为了引入RocketMQ的延迟消息
简单总结一下:创建订单的时候,发送一条延时30分钟的消息。到30分钟后,消费者拿到信息,再去判断订单是否已付款,如果付款就跳过不处理,没付款,那就取消订单。
这种方案:没有多余的扫描数据库操作;谁没付款,就去取消谁。多好呀!在生产上,赶紧用起来。
生产者
上面,介绍的都是方法论,下面就是具体的实操环节了。
下面,简单用一个demo介绍一下生产者
public class Producer {
public static void main(String[] args) throws Exception{
//生产者组
DefaultMQProducer producer = new DefaultMQProducer("delay_producer_group");
//设置nameserver
producer.setNamesrvAddr("localhost:9876");
//启动生产者
producer.start();
//构建消息
Message message = new Message("delayTopic","TagA","delayMessage".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 重点:设置延迟级别
message.setDelayTimeLevel(3);
// 发送消息
SendResult sendResult = producer.send(message);
// 打印发送结果
System.out.println("发送结果:"+sendResult);
// 关闭生产者
producer.shutdown();
}
}
复制代码
这里强调一下,不是延迟发送哈,是延迟消费。发送是立马就发送的,只是消费的时候,延迟30分钟。
补充知识点
延迟级别是从1开始的,不是从0开始。然后你可能会发现,最多延迟2小时。如果你想延迟3小时,对不起,RocketMQ不支持。告辞!!!
消费者
public class Consumer {
public static void main(String[] args) throws Exception {
// 消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer_group");
//注册nameserver
consumer.setNamesrvAddr("localhost:9876");
// 订阅主题
consumer.subscribe("delayTopic","TagA");
// 开启消费offset
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (int i = 0; i < list.size(); i++) {
MessageExt messageExt = list.get(i);
String msg = new String(messageExt.getBody());
//这里主要打印延迟时间≈当前时间-消息的生产时间
System.out.println(msg+" 时间="+(System.currentTimeMillis()-messageExt.getBornTimestamp()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
复制代码
总结:延迟消费者和普通的消费者相同,一毛一样。延迟消息的核心点:生产者多了一个延迟级别。
知其然知其所以然
上面,你已经知道怎么使用了。
如果面试官问你:RocketMQ的延迟消息底层原理是什么?
那你接着看下去。
看图说话。
-
第一,生产者发送的消息,因为带了延迟级别,因此会被分发到叫SCHEDULE_TOPIC_XXXX的Topic中。里面有18的队列,一个队列对应着一个延迟级别。比如queueId=delayLevel-1。
-
第二,定时器,每100毫秒,扫描所有延迟级别里面的延迟消息message,如果消费时间已经大于当前时间,那定时器就会把延迟消息message,发送到真正的topic(就是代码写的topic,比如上面代码的:delayTopic),根据负载均衡策略,把message发送到具体某个队列。
-
第三,有消息后,消费者进行消息和后续处理。
上面这里,是一个总体流程图。
然后,我们对照代码,来进一步深刻认识一下。其实,就是加深理解。
第一步:生产者发送的消息到SCHEDULE_TOPIC_XXXX的topic
org.apache.rocketmq.store.CommitLog#putMessage
//真正的topic
String topic = msg.getTopic();
//真正的队列Id
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 延迟级别大于0
if (msg.getDelayTimeLevel() > 0) {
// 如果延迟级别大于最大延迟级别,那就把延迟级别设置为最大延迟级别
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 延迟topicSCHEDULE_TOPIC_XXXX
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
// 根据延迟级别,获取队列id
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 消息的topic设置为延迟topic,不是设置真正的topic
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
省略部分封装msg的代码..
//最后把msg追加到mappedFile上,mappedFile这个后续再讲,在这里你把它当做一个文件即可
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
复制代码
第二步:定时器扫描信息
- 1,org.apache.rocketmq.store.schedule.ScheduleMessageService#start
public void start() {
//通过AtomicBoolean 来确保 有且仅有一次执行start方法
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
// 遍历所有 延迟级别
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
// key为延迟级别
Integer level = entry.getKey();
// value 为 毫秒数
Long timeDelay = entry.getValue();
// 根据延迟级别 ,获取对应的offset
Long offset = this.offsetTable.get(level);
//
if (null == offset) {
offset = 0L;
}
// 为每个延迟级别创建定时任务,开始执行定时任务,1S后开始执行
if (timeDelay != null) {
// 第二步:具体核心执行逻辑在DeliverDelayedMessageTimerTask-->executeOnTimeup()
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
// 延迟10秒后执行定时任务,flushDelayOffsetInterval=10s,周期也是10秒执行一次
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
//持久化每个队列的消费offset
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
复制代码
2,org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup
public void executeOnTimeup() {
//根据延迟级别和topic:RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";来找到对应的ConsumeQueue
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
// 消费偏移量
long failScheduleOffset = offset;
if (cq != null) {
// 根据消费偏移量从消息队列中获取所有有效消息
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
// 遍历所有消息
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
// 获取消息的物理偏移量
long offsetPy = bufferCQ.getByteBuffer().getLong();
// 获取消息的物理长度
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
//当前时间
long now = System.currentTimeMillis();
//消费时间
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
//下一个偏移量
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
//如果消费时间<当前时间,说明应该被消费了
long countdown = deliverTimestamp - now;
if (countdown <= 0) {
//根据物理偏移量和长度,获取消息
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
//构建真正 的消息
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
// 重新把消息发送到真正的消息队列上
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
...省略一堆不太重要的代码
}
//这里又重新添加一个新的任务,这次是100毫秒
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}
复制代码
第三步: 消费者后续处理(略)
最后用一张图来总结
好了,写完了,下期见,拜拜。
有问题的话,欢迎留言交流。
每日一问
RocketMQ不支持自定义延迟时间,那Kafka支持延迟消息吗?如果支持,支持自定义延迟时间吗?如要你实现自定义延迟时间,你会怎么实现?说说你的思路
欢迎留言
后续文章
- RocketMQ-入门(已更新)
- RocketMQ-架构和角色(已更新)
- RocketMQ-消息发送(已更新)
- RocketMQ-消费信息
- RocketMQ-消费者的广播模式和集群模式(已更新)
- RocketMQ-顺序消息(已更新)
- RocketMQ-延迟消息(已更新)
- RocketMQ-批量消息
- RocketMQ-过滤消息
- RocketMQ-事务消息
- RocketMQ-消息存储
- RocketMQ-高可用
- RocketMQ-高性能
- RocketMQ-主从复制
- RocketMQ-刷盘机制
- RocketMQ-幂等性
- RocketMQ-消息重试
- RocketMQ-死信队列
…
欢迎各位入(guan)股(zhu),后续文章干货多多。