概述
当 broker
重启后,broker
如何加载磁盘上的文件呢?这里面有个很重要的类 DefaultMessageStore
在协助 broker
加载磁盘文件。
可以将 DefaultMessageStore
理解为 文件存储控制类。该类聚合了 CommitLog
, ConsumeQueue
, IndexFile
等重要存储类。
DefaultMessageStore
初始化核心流程为:初始化 -> load -> start
实例化
DefaultMessageStore
在 broker
启动时,就会被实例化。
DefaultMessageStore
初始化相对简单,就是 实例化 各个属性
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
this.messageArrivingListener = messageArrivingListener;
this.brokerConfig = brokerConfig;
this.messageStoreConfig = messageStoreConfig;
this.brokerStatsManager = brokerStatsManager;
this.allocateMappedFileService = new AllocateMappedFileService(this);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
this.commitLog = new DLedgerCommitLog(this);
} else {
// 实例化 CommitLog
this.commitLog = new CommitLog(this);
}
this.consumeQueueTable = new ConcurrentHashMap<>(32);
// ConsumeQueue 刷盘线程
this.flushConsumeQueueService = new FlushConsumeQueueService();
// 清理 commitlog 的线程
this.cleanCommitLogService = new CleanCommitLogService();
this.cleanConsumeQueueService = new CleanConsumeQueueService();
this.storeStatsService = new StoreStatsService();
// IndexFile
this.indexService = new IndexService(this);
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService = new HAService(this);
} else {
this.haService = null;
}
// 转发 CommitLog 的线程
this.reputMessageService = new ReputMessageService();
this.scheduleMessageService = new ScheduleMessageService(this);
this.transientStorePool = new TransientStorePool(messageStoreConfig);
if (messageStoreConfig.isTransientStorePoolEnable()) {
this.transientStorePool.init();
}
this.allocateMappedFileService.start();
this.indexService.start();
// ReputMessageService 默认转发到 ConsumeQueue、IndexFile
this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
MappedFile.ensureDirOK(file.getParent());
lockFile = new RandomAccessFile(file, "rw");
}
复制代码
load – 加载磁盘文件
DefaultMessageStore
在被实例化后,此时会开始加载磁盘上的文件。
加载过程如下:
- 加载
CommitLog
- 加载
ConsumeQueue
- 加载
IndexFile
- 根据
ConsumeQueue
中存储的commitlog
最大偏移量,对内存中CommitLog
重新设置 写偏移量(下面会讲解为什么会执行这一步)
start – 启动各个刷盘线程
start
是在执行完 load
之后执行的动作。主要做的工作,就是启动线程.
整体流程如下:
- 启动
ReputMessageService
线程, 将CommitLog
中未转发至ConsumeQueue
的消息转发至ConsumeQueue
- 启动
ConsumeQueue
刷盘线程、CommitLog
刷盘线程 等.
现在来看下 load
中,为什么要执行第 4 步?
其实,就是为 start
流程的第 1 个步骤做准备。因为 ConsumeQueue
的刷盘频率其实不高。进程退出后,消息写入了 CommitLog
,可能还未写入 ConsumeQueue
。因此需要通过 ConsumeQueue
中 CommitLog
的最大偏移量,知道有哪些 消息是还未转发至 ConsumeQueue
的,不然消费端就无法消费到消息了
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END