RocketMQ之文件解析 (二)

文件解析

RocketMQ其实属于面向文件的编程 数据全部都是基于文件进行编码读取 以及 状态等

比如我们的kernel内核是一个虚拟文件系统 还有 zookeeper也是一个虚拟目录结构(内存结构)

采用文件IO流的方式 我们也知道 内存效率 > 磁盘效率 > 网络IO效率 所以内存效率是最高的,为什么RocketMQ要使用磁盘方式去设计呢?

RocketMQ的数据都是直接落地到磁盘的 读取 写入都是基于文件IO操作 大家都知道文件IO操作其实就是靠内核来完成的,所以这一块就是依赖内核中的一些设计去优化整个IO的效率 典型的RandomAccessFile中map方法就是靠内核中的mmap去实现的,RocketMQ中有两种落盘方式同步刷盘异步刷盘 这一块在前面笔者简单描述过 本篇文章主要了解下 具体文件存储数据格式

Rocket文件存储

RocketMQ存储的路径为${ROCKET_HOME}/store的目录下

rocketmq@master-35:~/store$ ll
-rw-rw-r-- 1 rocketmq rocketmq    0 8月  25 16:37 abort
-rw-rw-r-- 1 rocketmq rocketmq 4096 9月   7 16:26 checkpoint
drwxrwxr-x 2 rocketmq rocketmq 4096 8月  25 16:36 commitlog/
drwxrwxr-x 2 rocketmq rocketmq 4096 9月   7 16:27 config/
drwxrwxr-x 3 rocketmq rocketmq 4096 8月  25 16:32 consumequeue/
drwxrwxr-x 2 rocketmq rocketmq 4096 8月  25 16:32 index/
-rw-rw-r-- 1 rocketmq rocketmq    4 8月  25 16:37 lock
复制代码

主要存储文件以及目录包含以下几个

  1. abort 用来记录整个broker是否为安全关闭的一个状态标识 在运行起来的时候会创建abort文件 安全关闭的情况下会删除 abort文件
  2. checkpoint 记录 commitlog,consumequeue, index 文件最后刷盘时间戳
  3. commitlog 存储所有消息的一个文件夹 该文件夹下包含当前broker接受的所有消息 下面会详细介绍commitlog存储消息的格式
  4. config broker运行的一些配置文件
  5. consumequeue topic对应消费队列的存储 每一个topic下面为每一个queue创建了一个目录目录名称为queue的id 下面会详细描述consumequeue的结构和作用
  6. index 消息索引 存储的是带有key的Message 下面为详细描述Index文件的作用以及结构
  7. lock 运行期间用到的全局锁

CommitLog

写入方式

MessageStore的接口定义了几种PutMessage的方式


// 异步写入返回CompletableFuture
default CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    return CompletableFuture.completedFuture(putMessage(msg));
}

// 异步批量写入 返回CompletableFuture
default CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatch messageExtBatch) {
    return CompletableFuture.completedFuture(putMessages(messageExtBatch));
}

// 同步写入直接返回PutMessageResult
PutMessageResult putMessage(final MessageExtBrokerInner msg);

//同步批量写入PutMessageResult
PutMessageResult putMessages(final MessageExtBatch messageExtBatch);
复制代码

RocketMQ中实现类为DefaultMessageStore 在上一篇文章中笔者有介绍过同步刷盘异步刷盘的逻辑以及异步编程模型CompletableFuture和部分源代码的实现同步双写,需要补充的一点是Rocketmq中写入消息都是以顺序的方式写入的 这样写入方式效率是最高的。

在这里我们主要介绍写入文件的数据格式和写入机制以及过期清理和复制机制

数据格式

commitlog目录下存储的就是对应具体消息,commitlog文件名称是由20个十进制的数字组成的 表示当前commitlog文件起始的文件偏移量 第一个文件肯定是00000000000000000000,每一个commitlog文件大小都是固定的1G,但是对应的内容是小余等于1G,消息为一个整体单元不可分割所以是小余等于1G的。

image.png

commitlog存储的是所有消息不区分Topic,commitlog数据结构如下

属性 描述 类型
msgLen 消息长度 Int
bodyCrc checksum 校验 Int
queueId 队列id Long
flag 用户自定义 Int
queueOffset 队列偏移量 Long
physicalOffset 磁盘文件偏移量 Long
sysFlag 用来计算FilterType和事务状态 Int
bornHostTimestamp 消息创建时间 Long
bornHost 生产者Host, ipv4: IP(4)+Port(4) 8 , Ipv6: IP(16)+Port(4) 8 or 20
storeTimeStamp 消息存储时间 Long
storehostAddressLength broker地址 8 or 20
reconsumeTimes 回收时间 Int
preparedTransactionOffset 预处理事务消息的偏移量 Int
bodyLength 消息体长度 Int
body 消息体 bodyLength
topicLength 消息归属Topic
propertiesLength 配置信息

对应Rocket源码中的MessageExt

private String brokerName;

private int queueId;

private int storeSize;

private long queueOffset;
private int sysFlag;
private long bornTimestamp;
private SocketAddress bornHost;

private long storeTimestamp;
private SocketAddress storeHost;
private String msgId;
private long commitLogOffset;
private int bodyCRC;
private int reconsumeTimes;

private long preparedTransactionOffset;

复制代码

对应编码之后的Buffer代码

protected PutMessageResult encode(MessageExtBrokerInner msgInner) {
    ....
    // Initialization of storage space
    this.resetByteBuffer(encoderBuffer, msgLen);
    // 1 TOTALSIZE
    this.encoderBuffer.putInt(msgLen);
    // 2 MAGICCODE
    this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);
    // 3 BODYCRC
    this.encoderBuffer.putInt(msgInner.getBodyCRC());
    // 4 QUEUEID
    this.encoderBuffer.putInt(msgInner.getQueueId());
    // 5 FLAG
    this.encoderBuffer.putInt(msgInner.getFlag());
    // 6 QUEUEOFFSET, need update later
    this.encoderBuffer.putLong(0);
    // 7 PHYSICALOFFSET, need update later
    this.encoderBuffer.putLong(0);
    // 8 SYSFLAG
    this.encoderBuffer.putInt(msgInner.getSysFlag());
    // 9 BORNTIMESTAMP
    this.encoderBuffer.putLong(msgInner.getBornTimestamp());
    // 10 BORNHOST
    socketAddress2ByteBuffer(msgInner.getBornHost() ,this.encoderBuffer);
    // 11 STORETIMESTAMP
    this.encoderBuffer.putLong(msgInner.getStoreTimestamp());
    // 12 STOREHOSTADDRESS
    socketAddress2ByteBuffer(msgInner.getStoreHost() ,this.encoderBuffer);
    // 13 RECONSUMETIMES
    this.encoderBuffer.putInt(msgInner.getReconsumeTimes());
    // 14 Prepared Transaction Offset
    this.encoderBuffer.putLong(msgInner.getPreparedTransactionOffset());
    // 15 BODY
    this.encoderBuffer.putInt(bodyLength);
    if (bodyLength > 0)
        this.encoderBuffer.put(msgInner.getBody());
    // 16 TOPIC
    this.encoderBuffer.put((byte) topicLength);
    this.encoderBuffer.put(topicData);
    // 17 PROPERTIES
    this.encoderBuffer.putShort((short) propertiesLength);
    if (propertiesLength > 0)
        this.encoderBuffer.put(propertiesData);

    encoderBuffer.flip();
    return null;
}
复制代码

写入机制

之前通过asyncPutMessage我们简单分析过异步编程模型和4.7版本之后的优化,现在我们主要看下写入机制以及文件创建的时机。

发送消息请求

// 解析消息头 包含消息生产者发送的一些信息以及topic和队列信息
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
    return CompletableFuture.completedFuture(null);
}
// 消息的上下文 可以用于跟踪消息trace 基本上就是reqeustHeader中的一些信息
mqtraceContext = buildMsgContext(ctx, requestHeader);
//注册的一些消息hook
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
if (requestHeader.isBatch()) {
    //批量消息
    return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
    //单条消息
    return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
}
复制代码
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                            SendMessageContext mqtraceContext,
                                                            SendMessageRequestHeader requestHeader) {
  
  
 
   int queueIdInt = requestHeader.getQueueId();
   //获取topic的配置信息
   TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    ....
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    ... //省去填充Message数据部分

    CompletableFuture<PutMessageResult> putMessageResult = null;
    String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (transFlag != null && Boolean.parseBoolean(transFlag)) {
        //事物消息存储
        putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
    } else {
        //普通消息存储
        putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
    }
    return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}
复制代码

getMessageStore的实现默认为DefaultMessageStore 最后实现消息存储的为CommitLogDLedgerCommitLog,我们主要看下CommitLog的实现DLedgerCommitLog在后面的Raft协议实现上会详细描述

处理消息存储

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {

     ... //省略判断以及填充逻辑
     
    //获取消息的编码器
    PutMessageThreadLocal putMessageThreadLocal =  this.putMessageThreadLocal.get();
    //将消息进行编码 对应实现在`MessageExtEncoder`中 主要将上面说的消息结构 编码成bytebuffer
    PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
    //将编码结果赋值给msg的EncodeBuffer
   msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);

    //写入锁 这里的锁可以通过配置文件使用 CAS实现的自旋锁 或者ReentrantLock
    putMessageLock.lock(); 
    try {
        //获取对应的MappedFile 最后一个 没有即为空
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
           ....
          
        if (null == mappedFile || mappedFile.isFull()) {
            //当mappedfile为空 或者已经写满了 则新创建一个mappedfile
            mappedFile = this.mappedFileQueue.getLastMappedFile(0); 
        }
         .....
         // 将消息追加到mappedFile中
        result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
        switch (result.getStatus()) {
           //....
            case END_OF_FILE:
                unlockMappedFile = mappedFile;
                // 写入文件过大 超过整个mappedFile的大小 新创建文件再次写入 commitlog文件尾部留有默认长度为 8个字节
                mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                //....
                result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                break;
           
        }

        elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
        beginTimeInLock = 0;
    } finally {
        putMessageLock.unlock();
    }

复制代码

通过上面代码我们可以看出来 文件创建的时机有两种,一种是第一次消息存储 内存中没有对应的mappedFile文件 则通过mappedFileQueue.getLastMappedFile(0)方式创建新的mappedFile文件, 第二种是当前最新的mappedFile已经无法写入新的消息了 则创建新的mappedFile文件.

MappedFile文件创建


if (mappedFileLast != null && mappedFileLast.isFull()) {
    createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}

if (createOffset != -1 && needCreate) {
    // nextFilePath 就是最后一个文件起始的offset + 最后一个文件的大小
    String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
    
    String nextNextFilePath = this.storePath + File.separator
        + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
    MappedFile mappedFile = null;
    ..... //省略部分逻辑
       mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
            nextNextFilePath, this.mappedFileSize)

   
}
复制代码

通过AllocateMappedFileService去创建MappedFile

private boolean mmapOperation() {
       ....
       // 从队列中获取数据
        req = this.requestQueue.take();
         ...//省略部分过程

        if (req.getMappedFile() == null) {
            long beginTime = System.currentTimeMillis();

            MappedFile mappedFile;
    
           // 判断是否开启 isTransientStorePoolEnable ,如果开启则使用堆外内存进行写入数据,最后从堆外内存中 commit 到 FileChannel 中。
            if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                try {
                    mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
                    mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                } catch (RuntimeException e) {
                    log.warn("Use default implementation.");
                    mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                }
            } else {
                //使用mmap(RandomAccessFile#channel#map方法创建mappedByteBuffer)的方式创建MappedFile
                mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
            }

            // 判断文件大小是否大于等于1G只有commitlog才是1G文件 这里其实就是判断是否为 commitlog文件 数据预热 填充对应pagecache的数据 将mmap的数据内存映射提前准备好
            if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
                .getMappedFileSizeCommitLog()
                &&
                this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
                mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
                    this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
            }

            req.setMappedFile(mappedFile);
            this.hasException = false;
            isSuccess = true;
        }
   ...


复制代码

这里有几个关键点需要说明下 isTransientStorePoolEnable以及 warmMappedFile方法
isTransientStorePoolEnable
这个必须要在异步刷盘而且为master节点上才能开启, 主要是将数据写入到堆外内存中 然后通过批量commit写入到FileChannel中 这会影响到 复制到Slave节点,复制到Slave节点的数据都是已经commit的数据才会复制,所以可能会导致slave节点的数据有延迟 这个延迟受commitIntervalCommitLog,commitCommitLogThoroughInterval 默认为200ms配置影响。

warmMappedFile

warmMappedFile 主要是做数据预热,可以通过下面代码看到 在kernel内核中 默认的pagecache size大小为4K,所有程序在读取文件和写入文件 都是会先将数据写入到pagecache 或者 从pagecache中读取数据,这里的预热是为了 先将内存大小分配好 方便后面直接写入避免再分配pagecache的过程导致缺页的异常,将内存数据都写入0 是为了占位 按照pagecache的大小循环 也就是4k 在每一页都写入0 占用对应的pagecache
这里还区分了下 如果是同步刷盘 那么总共1G会使用多少页 针对多少页做一次刷盘操作 在最后再做一次刷盘

public void warmMappedFile(FlushDiskType type, int pages) {
    long beginTime = System.currentTimeMillis();
    ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
    int flush = 0;
    long time = System.currentTimeMillis();
    for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
        byteBuffer.put(i, (byte) 0);
        // force flush when flush disk type is sync
        if (type == FlushDiskType.SYNC_FLUSH) {
            if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
                flush = i;
                mappedByteBuffer.force();
            }
        }

        // prevent gc
        if (j % 1000 == 0) {
            log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
            time = System.currentTimeMillis();
            try {
                Thread.sleep(0);
            } catch (InterruptedException e) {
                log.error("Interrupted", e);
            }
        }
    }

    // force flush when prepare load finished
    if (type == FlushDiskType.SYNC_FLUSH) {
        log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
            this.getFileName(), System.currentTimeMillis() - beginTime);
        mappedByteBuffer.force();
    }
    log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
        System.currentTimeMillis() - beginTime);

    this.mlock();
}
复制代码

mlock

因为使用的是堆外内存DirectBuffer是可以获取到buffer所对应的物理内存地址,通过mlock可以将当前DirectBuffer使用的内存,防止被交换到swap空间,这样做的好处就是在程序频繁读取或者写入数据的时候 效率会提高 因为物理内存始终不会释放被当前进程所持有。

madvise
madvise会向内核提供一个针对于于地址区间的I/O的建议,内核可能会采纳这个建议,会做一些预读的操作。MADV_WILLNEED预计在不久的将来访问,内核会预读取一些数据到pagecache中。

比如我们读取一个文件的时候 都是通过pagecache读取 pagecache对应到一个文件描述符,文件描述符中对应一个真实的物理磁盘地址,那么如果没有pagecache,系统就会触发一个缺页异常 建立pagecache并且绑定到一个文件描述符中,然后将数据回写到pagecache pagecache才返回给应用程序对应的进程,这样整个操作时间会比较长 因为io处理耗时比较高, 所以使用madvise可以让内核预读取一个地址区间的数据到pagecache中 减少了缺页的异常,从而提高整个系统读取的效率。

public void mlock() {
    final long beginTime = System.currentTimeMillis();
    final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
    Pointer pointer = new Pointer(address);
    {
        int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
        log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
    }

    {
        int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
        log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
    }
}
复制代码

文件过期时间

消息是被顺序存储在commitlog文件中的,commitlog中因为每条消息的大小是不固定的 所以消息清理是按照时间单位进行清理的

DefaultMessageStore中会创建一个schedule 每10s中检查一次过期文件

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        DefaultMessageStore.this.cleanFilesPeriodically();
    }
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
复制代码

清理文件包含commitLogconsumeQueue

private void cleanFilesPeriodically() {
    this.cleanCommitLogService.run();
    this.cleanConsumeQueueService.run();
}
复制代码

commitlog中判断删除条件如下

private void deleteExpiredFiles() {
    int deleteCount = 0;
    // 配置文件中的过期时间 小时为单位
    long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
    
     ....
     //清理时间达到 默认为凌晨4点 deleteWhen
    boolean timeup = this.isTimeToDelete();
    // 磁盘空间占用率 默认为75%
    boolean spacefull = this.isSpaceToDelete();
    // 手动删除
    boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
   
    if (timeup || spacefull || manualDelete) {

        if (manualDelete)
            this.manualDeleteFileSeveralTimes--;

        boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;

        log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
            fileReservedTime,
            timeup,
            spacefull,
            manualDeleteFileSeveralTimes,
            cleanAtOnce);

        fileReservedTime *= 60 * 60 * 1000;

        deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
            destroyMapedFileIntervalForcibly, cleanAtOnce);
        if (deleteCount > 0) {
        } else if (spacefull) {
            log.warn("disk space will be full soon, but delete file failed.");
        }
    }
}
复制代码
private final double diskSpaceWarningLevelRatio =
    Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90"));

private final double diskSpaceCleanForciblyRatio =
    Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85"));
复制代码

默认过期时间为72小时也就是3天,除了我们自动清理,下面几种情况也会自动清理 无论文件是否被消费过都会被清理

1、文件过期并且达到清理时间 默认是凌晨4点,自动清理过期时间
2、文件过期 磁盘空间占用率超过75%后,无论是否到达清理时间 都会自动清理过期时间
3、磁盘占用率达到清理阈值 默认85%后,按照设定好的清理规则(默认是时间最早的)清理文件,无论是否过期
4、磁盘占用率达到90%后,broker拒绝消息写入

ConsumeQueue

ConsumeQueue中存储的是Topic消费队列目录结构为
topic/queueid/offet如下

└── TopicTest
    ├── 0
    │   └── 00000000000000000000
    ├── 1
    │   └── 00000000000000000000
    ├── 2
    │   └── 00000000000000000000
    └── 3
        └── 00000000000000000000
复制代码

在每一个topic下面为我们每一个queue都创建了对应的目录,目录名称为queue的id,queue下面存储的目录名称和commitlog格式是一样的 都是20个十进制的数字组成 标识当前queue的偏移量 区别的是每一个文件名称都是固定的 因为consumequeue的文件大小是固定的 意思就是不同queue下面的文件名称是一致的。

String queueDir = this.storePath
    + File.separator + topic
    + File.separator + queueId;

this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
复制代码

作用

Rocketmq是通过订阅Topic来消费消息的,但是因为commitlog是不区分topic存储消息的,如果消费者通过遍历commitlog去消费消息 那么效率就非常低下了,所以设计了ConsumeQueue用来存储Topic下面每一个队列中消费的offset可以将这里理解为一个队列消息对应的索引文件。

数据格式

image.png

单条ConsumeQueue固定的大小是占用20个字节,所以每一个Consume的大小都是固定的 Consume单个文件的大小也是固定的 总共是30万条数据。

public static final int CQ_STORE_UNIT_SIZE = 20;

// ConsumeQueue file size,default is 30W
private int mappedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE;
复制代码

从存储数据格式我们就可以猜出来对应的ConsumeQueue文件就是用来查询对应队列中消息数据本章节我们主要看下 ConsumeQueue存储机制和如何通过ConsumeQueue定位到具体消息以及消息的消费偏移量如何处理和如果通过时间查找对应的消息。

创建时机

创建ConsumeQueue的时机其实是通过查询消息的时候 consumeQueueTable中没有当前topic#queueid的时候就会创建一个对应的的mappedFile文件

public ConsumeQueue findConsumeQueue(String topic, int queueId) {
   ....
   ConsumeQueue logic = map.get(queueId);
   if (null == logic) {
       //若当前内存中没有对应topic下的consumequeue 则创建对应的consumequeue
       ConsumeQueue newLogic = new ConsumeQueue(
           topic,
           queueId,
           StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
           this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
           this);
       ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
       if (oldLogic != null) {
           logic = oldLogic;
       } else {
           logic = newLogic;
       }
   }

   return logic;
}
复制代码

调用findConsumeQueue的位置有很多比如通过Topic和queueId获取某个队列消息的时候会使用到

数据恢复

在程序启动的时候会触发DefaultMessageStore#load的方法将本地磁盘中的数据 load到内存将磁盘中的状态恢复到程序内存中。

public boolean load() {
        ...
        // load Commit Log
        result = result && this.commitLog.load();

        // load Consume Queue
        result = result && this.loadConsumeQueue();
        ...
}


private long recoverConsumeQueue() {
    long maxPhysicOffset = -1;
    for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
        for (ConsumeQueue logic : maps.values()) {
            logic.recover();
            if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
                maxPhysicOffset = logic.getMaxPhysicOffset();
            }
        }
    }

    return maxPhysicOffset;
}


复制代码

可以看到在DefaultMessageStore#load方法有一个方法loadConsumeQueue的方法

private boolean loadConsumeQueue() {
    File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
    //获取Store consumequeue目录下所有子目录 加载进来
    File[] fileTopicList = dirLogic.listFiles();
    if (fileTopicList != null) {
        
        for (File fileTopic : fileTopicList) {
            String topic = fileTopic.getName();

            File[] fileQueueIdList = fileTopic.listFiles();
            if (fileQueueIdList != null) {
                for (File fileQueueId : fileQueueIdList) {
                    int queueId;
                    try {
                        queueId = Integer.parseInt(fileQueueId.getName());
                    } catch (NumberFormatException e) {
                        continue;
                    }
                    ConsumeQueue logic = new ConsumeQueue(
                        topic,
                        queueId,
                        StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
                        this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
                        this);
                    this.putConsumeQueue(topic, queueId, logic);
                    
                }
            }
        }
    }

    log.info("load logics queue all over, OK");

    return true;
}
复制代码

load完之后会通过recover将数据初始化到堆外内存中

public void recover() {
    final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {

        int index = mappedFiles.size() - 3;
        if (index < 0)
            index = 0;

        int mappedFileSizeLogics = this.mappedFileSize;
        MappedFile mappedFile = mappedFiles.get(index);
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        long processOffset = mappedFile.getFileFromOffset();
        long mappedFileOffset = 0;
        long maxExtAddr = 1;
        while (true) {
            for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE)          {
                ....//省略文件读取
                //记录最大物理偏移
                this.maxPhysicOffset = offset + size;

                .....
         }
        }
        //记录当前文件的offset
        processOffset += mappedFileOffset;
        this.mappedFileQueue.setFlushedWhere(processOffset);
        this.mappedFileQueue.setCommittedWhere(processOffset);
        this.mappedFileQueue.truncateDirtyFiles(processOffset);

        if (isExtReadEnable()) {
            this.consumeQueueExt.recover();
            log.info("Truncate consume queue extend file by max {}", maxExtAddr);
            this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
        }
    }
}
复制代码

获取消息

因为消费消息整体流程还是比较长的 后续会通过解析PullMessageProcessor源码来具体分析整个消费链路,本篇文章主要分析下consumeQueue的作用 所以主要是围绕consumeQueue这个类来分析下consumeQueue的功能.

public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
   int mappedFileSize = this.mappedFileSize;
   //通过startIndex * 单条消息的大小定位offset
   long offset = startIndex * CQ_STORE_UNIT_SIZE;
   if (offset >= this.getMinLogicOffset()) {
       MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
       if (mappedFile != null) {
           SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
           return result;
       }
   }
   return null;
}
复制代码

以上代码对应ConsumeQueue#getIndexBuffer方法,该方法主要通过startIndex来获取消费消息的条目,因为每个条目的大小是固定的 所以只需要根据index*20则可以定位到具体的offset的值,就可以知道具体条目的数据SelectMappedBufferResult也就是开始消费的条目 然后往后消费,具体代码在DefaultMessageStore#getMessages方法中.

...
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
    long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
    int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
    long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
    ....
    
    SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);

    }
复制代码

我这里摘出部分代码 只是为了说明这边consumeQueue的一个逻辑,上面已经说过每一个条目中包含commitlogOffset,size,tagCode三个数据,那么ommitLog.getMessage(offsetPy, sizePy)获取对应位置上的消息.

根据时间获取消息

ConsumeQueue#getOffsetInQueueByTime中实现了对消息进行时间偏移读取,但是在consumeQueue并没有记录消息的时间,所以避免不了去commitlog中获取对应消息的偏移量,我们看下consumeQueue是如何实现的。

public long getOffsetInQueueByTime(final long timestamp) {
    MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp);
    if (mappedFile != null) {
        long offset = 0;
        int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;
        int high = 0;
        int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
        long leftIndexValue = -1L, rightIndexValue = -1L;
        long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset();
        SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);
        if (null != sbr) {
            ByteBuffer byteBuffer = sbr.getByteBuffer();
            high = byteBuffer.limit() - CQ_STORE_UNIT_SIZE;
            try {
                while (high >= low) {
                    //通过最小偏移量和最大偏移量获取中位偏移量
                    midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE;
                    //...省略数据读取
                    //从commitlog中获取当前位置消息的时间
                    long storeTime =
                        this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
                    if (storeTime < 0) {
                        return 0;
                    } else if (storeTime == timestamp) {
                        targetOffset = midOffset;
                        break;
                    } else if (storeTime > timestamp) {
                         // 如果消息大于当前中位消息时间 则再往前进行二分查找
                        high = midOffset - CQ_STORE_UNIT_SIZE;
                        rightOffset = midOffset;
                        rightIndexValue = storeTime;
                    } else {
                       // 如果消息大于当前中位消息时间 则再往后进行二分查找
                        low = midOffset + CQ_STORE_UNIT_SIZE;
                        leftOffset = midOffset;
                        leftIndexValue = storeTime;
                    }
                }

                if (targetOffset != -1) {

                    offset = targetOffset;
                } else {
                    if (leftIndexValue == -1) {

                        offset = rightOffset;
                    } else if (rightIndexValue == -1) {

                        offset = leftOffset;
                    } else {
                        offset =
                            Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp
                                - rightIndexValue) ? rightOffset : leftOffset;
                    }
                }

                return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE;
            } finally {
                sbr.release();
            }
        }
    }
    return 0;
}
复制代码

通过上面我们可以看出 rocketmq通过二分查找consumeQueue的方式去commitlog中检索对应消息的时间点进行比较。

IndexFile

IndexFile 文件存储的是 包含Key的消息,当生产者生产消息到Broker的时候,Broker接收消息的是发现消息包含Key的时候 会将对应消息的索引记录在IndexFile中,只记录包含Key的消息是因为 RocketMq 可以通过制定key查询消息

IndexFile 文件名称是已创建文件时间的时间戳命令 20210901183407523

文件中包含三部分 Header头,Slots槽位,Indexes索引单元列表

image.png

IndexHeader

属性 描述 类型
beginTimestamp 第一条消息时间戳 Long
endTimestampIndex 最后一条消息时间戳 Long
beginPhyOffset 在commitlog中开始的offset Long
endPhyOffset 在commitlog中结束的offset Long
hashSlotCount 已使用的SlotCount Int
IndexCount 索引单元的数量 Int

索引文件中的Header 主要包含以上几个部分 所占用大小就等于 8 + 8 + 8 + 8 + 4 + 4 = 40个字节,在RocketMQ源码中我们可以看到IndexHeader类中


private static int beginTimestampIndex = 0;
private static int endTimestampIndex = 8;
private static int beginPhyoffsetIndex = 16;
private static int endPhyoffsetIndex = 24;
private static int hashSlotcountIndex = 32;
private static int indexCountIndex = 36;
....

private AtomicLong beginTimestamp = new AtomicLong(0);
private AtomicLong endTimestamp = new AtomicLong(0);
private AtomicLong beginPhyOffset = new AtomicLong(0);
private AtomicLong endPhyOffset = new AtomicLong(0);
private AtomicInteger hashSlotCount = new AtomicInteger(0);
private AtomicInteger indexCount = new AtomicInteger(1);
复制代码

像代码中的Index 就表示在文件中对应存储的位置,所有的值都是通过Atomic方式去实现的 使用的CAS的方式 也就是多线程下是安全的

Slot

Slot是槽位,每一个槽位占用4个字节也就是Int值 表示的是索引单元列表(Indexes)中当前槽位 key最新的索引值(这句话可能有点绕 通过代码和图来理解下),在IndexFile类中的实现

public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,
   final long endPhyOffset, final long endTimestamp) throws IOException {

int fileTotalSize =
   IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
   
 ....
 this.hashSlotNum = hashSlotNum;
}
复制代码
private int maxHashSlotNum = 5000000;
private int maxIndexNum = 5000000 * 4;
复制代码

fileTotalSize表示整个IndexFile文件大小是固定的,maxHashSlotNum表示最多槽位为500万个,maxIndexNum表示最多索引单元为 2000万个 那么可以知道文件大小为500万*4 + 2000万 * 20 + 40

那么每一个key如何计算所在的槽位呢?
是通过key的hashcode 取模 500万

int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
复制代码

slotPos就表示当前消息key所在的槽位索引

每一个槽位中的值 都是最新索引单元 所对应Indexes中的索引的Position 对应源码中如下

// indexHead也就是40B 加上槽位索引*4B 获取当前槽位在文件中的具体position
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
// 通过position 取得对应槽位上的值
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
    slotValue = invalidIndex;
}

....

// 设置槽位position的最新索引值为当前索引单元的count 也就是插入进去的索引单元在 indexes中的索引
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

复制代码

基本关系 基本就如下图所示,indexes中文件格式下面为说到 目前方便理解是这么画的

image.png

Index

index 索引单元存储 数据结构如下

属性 描述 类型
keyHash 消息key的hashCode Int
phyOffset 消息在Commitlog文件中的偏移量 Long
timeDiff 当前消息和文件起始消息的时间差 精确到秒 Int
preSlotValue 上一个索引单元对应的下标值(方便理解) Int

所以每一个索引单元占用的大小为 4+8+4+4 = 20B

三者之间的关系

image.png

通过Key查询消息的流程

  1. 获取对应Key的HashCode
  2. 通过HashCode找到槽位
  3. 通过槽位获取最新的Index索引
  4. 通过索引中的HasCode比对 依次向上查找 这里还可以通过时间筛选
  5. 将对应符合条件的Index单元中的 commitLog的offset记录 查询对应消息

具体实现代码如下

....
for (int nextIndexToRead = slotValue; ; ) {
    if (phyOffsets.size() >= maxNum) {
        break;
    }

    int absIndexPos =
        IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
            + nextIndexToRead * indexSize;

    int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
    long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);

    long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
    int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);

    if (timeDiff < 0) {
        break;
    }

    timeDiff *= 1000L;

    long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
    boolean timeMatched = (timeRead >= begin) && (timeRead <= end);

    if (keyHash == keyHashRead && timeMatched) {
        phyOffsets.add(phyOffsetRead);
    }

    if (prevIndexRead <= invalidIndex
        || prevIndexRead > this.indexHeader.getIndexCount()
        || prevIndexRead == nextIndexToRead || timeRead < begin) {
        break;
    }

    nextIndexToRead = prevIndexRead;
}
...
复制代码

这三者之间的关系 通过语言描述会导致比较绕 其实还是比较简单的

  1. slot中对应长度有500W
  2. 通过Key 取模 500W 找到对应 槽位对应的Position
  3. 槽位中的值是 当前槽位对应索引单元中的索引最新的Position
  4. 索引单元中 会存储上一个相同的Slot槽位 索引单元的Position

这样设计是因为 通过HasCode取模 槽位是会冲突的 冲突的方式采用链表一样的逻辑连接起来,其实和HasMap是比较像 只是JDK8中的HashMap还会有红黑树、RocketMQ是在文件中的存储

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享