【一起学RocketMq】消息的存储源码分析

前言

我们设计一个消息的存储服务器,首先会现在内存里一块内存缓冲区,但是内存缓存有限,所以需要定时把消息刷到磁盘或者数据库等持久化介质中。如果存在磁盘中的文件,一个文件肯定是不行的,就算存没问题,那么检索也会超级耗时,过期消息的清理也很麻烦,所以消息需要多文件的存储。
RocketMq就是内存+多文件的存储,每个Broker对应一个MessageStore,专门用来存储接收到的消息。MessageStore只是存储的一个抽象,MessageStore里保存着一个CommitLog,CommitLog维护了一个MappedFileQueue,而MappedFileQueue里又维护了多个MappedFile,每个MappedFile都是磁盘文件的一个映射,可以说MappedFile和每个文件一一对应,这些文件才是真正存储消息的地方。MappedFile的文件名也很有特点,记录了它里面存储的第一条消息的全局物理偏移量。

image.png

探索消息存储

Broker对于每个请求的Code,都注册了对应的处理类,其中用于接收消息的处理类为SendMessageProcessor,这个时机是在Broker启动时。

image.png

1、SendMessageProcessor实现了NettyRequestProcessor接口,并在接口方法processRequest()中处理接收到的请求

image.png

processRequest会调用asyncProcessRequest()

image.png
我们看到请求就是消息发送时候封装的RemotingCommand,根据code和isBatch,我们知道这里调用的是
asyncSendMessage()

2、首先根据请求,封装成内部消息结构MessageExtBrokerInner

image.png

然后调用MessageStore的asyncPutMessage()方法将消息放入MessageStore中。

image.png

MessageStore是个接口,实际上调用的DefaultMessageStore的putMessage方法

image.png

image.png

image.png

3、接下来看看CommitLog.putMessage()都做了什么

首先是设置MessageExtBrokerInner的一些信息

image.png

然后是从MappedFileQueue里面获取需要追加消息的MappedFile,然后是加锁处理并发

image.png

image.png

4、接下来我们看看MappedFile.appendMessage()做了啥。这个方法最终会调用appendMessagesInner,然后根据消息类型会调用AppendMessageCallback#doAppend()方法

image.png
我们分析下appendMessageInner()里的逻辑:
(1)获取到MappedFile中的writeBuffer,如果writeBuffer为空,则获取mappedByteBuffer。在MessageStore初始化时,会初始化一个Buffer缓冲池:TransientStorePool,TransientStorePool在初始化时会初始化若干DirectBuffer,放入一个Deque里,默认池子容量为5。MappedFile的writeBuffer就是从这个池子获取的。mappedByteBuffer就是该内存中的MappedFile和磁盘文件的映射,当追加消息时,会优先追加到writeBuffer。

(2) 调用cb.doAppend()方法,传入了以下几个参数

  • this.getFileFromOffset():MappedFile的全局消息物理偏移量(即MappedFile的第一个消息的全局物理偏移量,也就是文件名)
  • byteBuffer:即MappedFile的内存缓冲区。
  • this.fileSize-currentPos:fileSize是文件大小,currentPos为当前文件已经写到什么位置,两者相减就是当前文件的剩余容量。
  • messageExt:就是封装好的消息。

5、在CommitLog.doAppend()方法中
先计算消息存储的各个属性,如消息长度、消息在消息队列中的长度等

image.png

再判断消息追加后是否超过单个MappedFile的大小,如果超出,则返回状态码END_OF_FILE

image.png

然后将消息存储到缓冲池中,并返回PUT_OK的结果

image.png

整个消息存储到缓冲区的流程如下

image.png

结语

消息存储重在数据结构理解,首先是SendMessageProcess的processRequest()开始,首先会找出MessageStore来putMessage,然后从找出MappedFileQueue找出最新的一个MappedFile(不存在则创建一个),然后获取MappedFile的缓冲区,将消息追加到缓冲区即可。你可能会问消息只是放到了缓冲区,怎么存储到文件的呢?下篇文章一起看消息的持久化-消息刷盘。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享