commitlog 加载
当我们把 broker
重启后,RocketMQ
是如何重新将磁盘的 commitlog
文件重新加载到磁盘中的呢?带着这个疑问,我们来看一下 commitlog
的加载流程
commitlog
的加载流程大致如下:
我们在这里主要关心 第1,2 个流程。第 3 个流程,在讲解完 ConsumeQueue
, IndexFile
会重点说明。
稍微带下第3个流程,第 3 个流程逻辑为,将 commitlog
存入到 consumeQueue
。 这里的 commitlog
是未被转发存储到 consumeQueue
中的消息
DefaultMessageStore 初始化
在 rocketMQ 的世界中,我们知道 commitlog
被抽象为 CommitLog
类。每个文件被抽象为 MappedFile
。但是有一个很关键的类 DefaultMessageStore
统筹着所有文件的加载。
该类在初始化时,会顺带把 CommitLog
也初始化了
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
// ...
// 初始化 commitLog
if (messageStoreConfig.isEnableDLegerCommitLog()) {
this.commitLog = new DLedgerCommitLog(this);
} else {
this.commitLog = new CommitLog(this);
}
// ...
}
复制代码
DefaultMessageStore 初始化
在 RocketMQ
中,commitlog
被抽象为 CommitLog
类。每个文件被抽象为 MappedFile
。DefaultMessageStore
则作为一个 ‘聚合类’, 统筹着所有文件的加载。
该类在初始化时,会顺带把 CommitLog
也初始化了
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
// ...
// 初始化 commitLog
if (messageStoreConfig.isEnableDLegerCommitLog()) {
this.commitLog = new DLedgerCommitLog(this);
} else {
this.commitLog = new CommitLog(this);
}
// ...
}
复制代码
DefaultMessageStore#load() 加载磁盘上的文件
laod()
方法,主要是将磁盘上相关文件, 如 commitlog、consumeQueue、indexFile
加载到内存中.
DefaultMessageStore
在执行 load()
方法时,会委托 CommitLog
将 commitlog
加载到内存中。而 CommitLog#load()
方法,做的处理就是将存储目录下的 commitlog
文件加载到内存
public class DefaultMessageStore implements MessageStore {
public boolean load() {
boolean result = true;
try {
//todo 如果 abort 文件存在,则表示,当前是非正常退出
boolean lastExitOK = !this.isTempFileExist();
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
if (null != scheduleMessageService) {
result = result && this.scheduleMessageService.load();
}
// 委托 CommitLog 加载 Commit Log
result = result && this.commitLog.load();
// load Consume Queue
result = result && this.loadConsumeQueue();
if (result) {
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
this.indexService.load(lastExitOK);
// todo 根据是否正常退出,恢复 CommitLog
this.recover(lastExitOK);
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}
} catch (Exception e) {
log.error("load exception", e);
result = false;
}
if (!result) {
this.allocateMappedFileService.shutdown();
}
return result;
}
}
复制代码
在这个方法中,我们注意到当 abort
文件存在时,broker
会被判定为非正常退出。
并且在通过 CommitLog
将 commitlog
文件加载到内存后。还会执行一个 recover()
对 commitlog
信息进行复原。
为什么还需要对 commitlog
进行恢复?
原因是:commitlog
文件中,存储的消息不一定正确。当 broker
非正常退出时,某条消息可能只存了一半进去。recover()
方法,就是用来将不合法的消息剔除掉。
public class DefaultMessageStore implements MessageStore {
private void recover(final boolean lastExitOK) {
// todo 先恢复 consumeQueue, 获取 consumeQueue 中存储的 commitlog 最大的偏移量
long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
// todo 不正常退出和正常退出的恢复区别为:
// todo 不正常退出,会根据 maxPhyOffsetOfConsumeQueue 找到哪个文件是损坏的,并从该文件开始恢复
// todo 正常退出,则直接从后 3 个文件开始恢复
if (lastExitOK) {
this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
} else {
this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
}
this.recoverTopicQueueTable();
}
}
复制代码