深挖技术栈(第一期) 之 Rocket源码分析 《上》

作者:幂哥
复制代码

消息队列作为高并发系统核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。

官网及文档

Github:github.com/apache/rock…
Docs:github.com/apache/rock…

书籍推荐

RocketMQ技术内幕

概述

消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要具有以下优势:

  • 削峰填谷: 主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题

  • 系统解耦: 解决不同重要程度、不同能力级别系统之间依赖导致一死全死

  • 提升性能: 当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统

  • 蓄流压测: 线上有些链路不好压测,可以通过堆积一定量消息再放开来压测

RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点:

  • 支持严格的消息顺序

  • 支持 Topic 与 Queue 两种模式

  • 亿级消息堆积能力

  • 比较友好的分布式特性

  • 同时支持 Push 与 Pull 方式消费消息

  • 历经多次天猫双十一海量消息考验

目前主流的 MQ 主要是 RocketMQ、kafka、RabbitMQ,其主要优势有:

  • 支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,RabbitMQ 和 Kafka 不支持)

  • 支持结合 RocketMQ 的多个系统之间数据最终一致性(多方事务,二方事务是前提)

  • 支持 18 个级别的延迟消息(RabbitMQ 和 Kafka 不支持)

  • 支持指定次数和时间间隔的失败消息重发(Kafka 不支持,RabbitMQ 需要手动确认)

  • 支持 Consumer 端 Tag 过滤,减少不必要的网络传输(RabbitMQ 和 Kafka 不支持)

  • 支持重复消费(RabbitMQ 不支持,Kafka 支持)

消息队列对比参照表

技术架构

角色:

  • Producer:生产者

  • Consumer:消费者

  • Broker:负责消息存储、投递、查询

  • NameServer:路由注册中心。功能包括:Broker管理、路由信息管理

Topic、Broker和Queue的关系

核心数据结构

心跳超时机制

问题和总结

1、消息发送、存储和消费的过程?

  • 消息发送流程:

  • Broker启动时,向NameServer注册信息客户端调用producer发送消息时,会先从NameServer获取该topic的路由信息。消息头code为GET_ROUTEINFO_BY_TOPIC从NameServer返回的路由信息,包括topic包含的队列列表和broker列表Producer端根据查询策略,选出其中一个队列,用于后续存储消息每条消息会生成一个唯一id,添加到消息的属性中。属性的key为UNIQ_KEY对消息做一些特殊处理,比如:超过4M会对消息进行压缩producer向Broker发送rpc请求,将消息保存到broker端。

  • 消息存储流程:

  • Broker端收到消息后,将消息原始信息保存在CommitLog文件对应的MappedFile中,然后异步刷新到磁盘

  • ReputMessageServie线程异步的将CommitLog中MappedFile中的消息保存到ConsumerQueue和IndexFile中

  • ConsumerQueue和IndexFile只是原始文件的索引信息

  • 消息消费流程

  • 消息队列负载
    同一个消费组内的消费者共同承担其订阅主题下消息队列的消费。
    分配算法:尽量使用:平均分配,平均轮询分配。
    分配原则:同一个消息消费队列同一时间只会分配给同一个消费者,一个消费者可以分配多个消息消费队列。

  • 消息拉取

  • 顺序消息的消费存在消息队列锁:

  • 1.向broker发送锁定该消息队列的请求;

  • 2.锁定成功创建该消息队列的拉去任务;

  • 3.锁定失败则等待其它消费者释放该消息队列的锁。

  • 消费模式:

  • 1.集群模式:轮询或者根据算法分配,消息只会被某一个消费者消费到,可重试。

  • 2.广播模式:每条消息会被订阅了该消息主题的所有消费者消费,不存在重试。

  • 消息确认
    基于ACK确认机制

  • 消费进度管理,记录消息的消费进度

  • 广播模式:存储在消费者本地。

  • 集群模式:存放在服务端broker。

  • 详细可参考:cloud.tencent.com/developer/a…

2、异步刷盘怎么保证数据不丢失?

RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息量超出内存的限制。
RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式:

  • 1)异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入
    优点:性能高
    缺点:Master宕机,磁盘损坏的情况下,会丢失少量的消息, 导致MQ的消息状态和生产者/消费者的消息状态不一致

  • 2)同步刷盘方式:在返回应用写成功状态前,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,给应用返回消息写成功的状态。
    优点:可以保持MQ的消息状态和生产者/消费者的消息状态一致
    缺点:性能比异步的低

  • 同步刷盘还是异步刷盘,是通过Broker配置文件里的flushDiskType参数设置的,这个参数被设置成SYNC_FLUSH, ASYNC_FLUSH。

  • 同步复制、异步复制

  • 如果一个broker组有Master和Slave,消息需要从Master复制到Slave上,有同步和异步两种复制方式。

  • 1)同步复制方式:等Master和Slave均写成功后才反馈给客户端写成功状态
    优点:如果Master出故障,Slave上有全部的备份数据,容易恢复,消费者仍可以从Slave消费, 消息不丢失
    缺点:增大数据写入延迟,降低系统吞吐量,性能比异步复制模式略低,大约低10%左右,发送单个Master的响应时间会略高

  • 2)异步复制方式:只要Master写成功即可反馈给客户端写成功状态
    优点:系统拥有较低的延迟和较高的吞吐量. Master宕机之后,消费者仍可以从Slave消费,此过程对应用透明,不需要人工干预,性能同多个Master模式几乎一样
    缺点:如果Master出了故障,有些数据因为没有被写入Slave,而丢失少量消息。

  • 同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、SYNC_MASTER、SLAVE三个值中的一个。

  • 总结:

  • 消息零丢失是一把双刃剑,要想用好,还是要视具体的业务场景,在性能和消息零丢失上做平衡
    实际应用中,推荐把Master和Slave设置成ASYNC_FLUSH的异步刷盘方式,主从之间配置成SYNC_MASTER的同步复制方式,这样即使有一台机器出故障,仍然可以保证数据不丢。

3、零拷贝实现原理?

零拷贝原理:Consumer 消费消息过程,使用了零拷贝,零拷贝包含以下两种方式:

  • 1、使用 mmap + write 方式 (RocketMQ选择的方式:因为有小块数据传输的需求,效果会比 sendfile 更好)

  • 优点:即使频繁调用,使用小块文件传输,效率也很高;

  • 缺点:不能很好的利用 DMA 方式,会比 sendfile 多消耗CPU,内存安全性控制复杂,需要避免 JVM Crash 问题。

  • 2、使用 sendfile 方式

  • 优点:可以利用 DMA 方式,消耗 CPU 较少,大块文件传输效率高,无内存安全新问题;

  • 缺点:小块文件效率低亍 mmap 方式,只能是 BIO 方式传输,不能使用 NIO。

4、新应用怎么开始消费位点?

一个新的消费组订阅一个已存在的Topic主题时,消费组是从该Topic的哪条消息开始消费呢?

首先翻阅DefaultMQPushConsumer的API时,setConsumeFromWhere(ConsumeFromWhere consumeFromWhere)API映入眼帘,从字面意思来看是设置消费者从哪里开始消费。

  •  CONSUME_FROM_MAX_OFFSET
    从消费队列最大的偏移量开始消费。

  • CONSUME_FROM_FIRST_OFFSET
    从消费队列最小偏移量开始消费。

  • CONSUME_FROM_TIMESTAMP
    从指定的时间戳开始消费,默认为消费者启动之前的30分钟处开始消费。可以通过DefaultMQPushConsumer#setConsumeTimestamp。

  • 对于一个新的消费组,无论是集群模式还是广播模式都不会存储该消费组的消费进度,可以理解为-1,此时就需要根据DefaultMQPushConsumer#consumeFromWhere属性来决定其从何处开始消费,首先我们需要找到其对应的处理入口。我们知道,消息消费者从Broker服务器拉取消息时,需要进行消费队列的负载,即RebalanceImpl。
    结论:如果在生产环境下,一个新的消费组订阅一个已经存在比较久的topic,设置CONSUME_FROM_MAX_OFFSET是符合预期的

详情参考:www.jianshu.com/p/d8b73e3c6…

5、顺序消息实现原理?

在RocketMQ中提供了基于队列(分区)的顺序消费。RocketMQ中顺序性主要指的是消息顺序消费。RocketMQ 中每一个消费组一个单独的线程池并发消费拉取到的消息,即消费端是多线程消费。而顺序消费的并发度等于该消费者分配到的队列数。

  • RokcetMQ的完成顺序性主要是由3把琐来实现的

  • 1、消费端在启动时首先会进行队列负载机制,遵循一个消费者可以分配多个队列,但一个队列只会被一个消费者消费的原则。

  • 2、消费者根据分配的队列,向 Broker 申请琐,如果申请到琐,则拉取消息,否则放弃消息拉取,等到下一个队列负载周期(20s)再试。

  • 3、拉取到消息后会在消费端的线程池中进行消费,但消费的时候,会对消费队列进行加锁,即同一个消费队列中的多条消息会串行执行

  • 4、**在消费的过程中,**会对处理队列(ProccessQueue)进行加锁,保证处理中的消息消费完成,发生队列负载后,其他消费者才能继续消费。

  • 前面2把琐比较好理解,最后一把琐有什么用呢?

  • 例如队列 q3 目前是分配给消费者C2进行消费,已将拉取了32条消息在线程池中处理,然后对消费者进行了扩容,分配给C2的q3队列,被分配给C3了,由于C2已将处理了一部分,位点信息还没有提交,如果C3立马去消费q3队列中的消息,那存在一部分数据会被重复消费,故在C2消费者在消费q3队列的时候,消息没有消费完成,那负载队列就不能丢弃该队列,就不会在broker端释放琐,其他消费者就无法从该队列消费,尽最大可能保证了消息的重复消费,保证顺序性语义。

6、当消费者群组有新的消费者加入或者有消费者宕机,如何处理?

  • RebalanceService线程默认每20秒进行一次消息队列负载,根据当前消费组内消费者个数与主题队列数量按照某一种负载算法进行队列分配。

后续计划

  • ?offset存储原理

  • ?数据结构

  • ?事务消息、消息回查机制

  • ?延时消息

  • ?broker、producer、consumer增减影响

  • ?push、pull逻辑

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享