RocketMQ 之 DefaultMessageStore 核心消息存储类

概述

broker 重启后,broker 如何加载磁盘上的文件呢?这里面有个很重要的类 DefaultMessageStore 在协助 broker 加载磁盘文件。

可以将 DefaultMessageStore 理解为 文件存储控制类。该类聚合了 CommitLog, ConsumeQueue, IndexFile 等重要存储类。

DefaultMessageStore 初始化核心流程为:初始化 -> load -> start

实例化

DefaultMessageStorebroker 启动时,就会被实例化。
DefaultMessageStore 初始化相对简单,就是 实例化 各个属性

public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
    final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
    this.messageArrivingListener = messageArrivingListener;
    this.brokerConfig = brokerConfig;
    this.messageStoreConfig = messageStoreConfig;
    this.brokerStatsManager = brokerStatsManager;
    this.allocateMappedFileService = new AllocateMappedFileService(this);
    if (messageStoreConfig.isEnableDLegerCommitLog()) {
        this.commitLog = new DLedgerCommitLog(this);
    } else {
        // 实例化 CommitLog
        this.commitLog = new CommitLog(this);
    }
    this.consumeQueueTable = new ConcurrentHashMap<>(32);
    
    // ConsumeQueue 刷盘线程
    this.flushConsumeQueueService = new FlushConsumeQueueService();
    // 清理 commitlog 的线程
    this.cleanCommitLogService = new CleanCommitLogService();
    this.cleanConsumeQueueService = new CleanConsumeQueueService();
    this.storeStatsService = new StoreStatsService();
    
    // IndexFile
    this.indexService = new IndexService(this);
    if (!messageStoreConfig.isEnableDLegerCommitLog()) {
        this.haService = new HAService(this);
    } else {
        this.haService = null;
    }
    // 转发 CommitLog 的线程
    this.reputMessageService = new ReputMessageService();

    this.scheduleMessageService = new ScheduleMessageService(this);

    this.transientStorePool = new TransientStorePool(messageStoreConfig);

    if (messageStoreConfig.isTransientStorePoolEnable()) {
        this.transientStorePool.init();
    }

    this.allocateMappedFileService.start();

    this.indexService.start();
    
    // ReputMessageService 默认转发到 ConsumeQueue、IndexFile
    this.dispatcherList = new LinkedList<>();
    this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
    this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());

    File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
    MappedFile.ensureDirOK(file.getParent());
    lockFile = new RandomAccessFile(file, "rw");
}
复制代码

load – 加载磁盘文件

DefaultMessageStore 在被实例化后,此时会开始加载磁盘上的文件。
加载过程如下:

  1. 加载 CommitLog
  2. 加载 ConsumeQueue
  3. 加载 IndexFile
  4. 根据 ConsumeQueue 中存储的 commitlog 最大偏移量,对内存中 CommitLog 重新设置 写偏移量(下面会讲解为什么会执行这一步)

start – 启动各个刷盘线程

start 是在执行完 load 之后执行的动作。主要做的工作,就是启动线程.
整体流程如下:

  1. 启动 ReputMessageService 线程, 将 CommitLog 中未转发至 ConsumeQueue 的消息转发至 ConsumeQueue
  2. 启动 ConsumeQueue 刷盘线程、CommitLog 刷盘线程 等.

现在来看下 load 中,为什么要执行第 4 步?
其实,就是为 start 流程的第 1 个步骤做准备。因为 ConsumeQueue 的刷盘频率其实不高。进程退出后,消息写入了 CommitLog,可能还未写入 ConsumeQueue。因此需要通过 ConsumeQueueCommitLog 的最大偏移量,知道有哪些 消息是还未转发至 ConsumeQueue 的,不然消费端就无法消费到消息了

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享