commitlog 加载
当我们把 broker 重启后,RocketMQ 是如何重新将磁盘的 commitlog 文件重新加载到磁盘中的呢?带着这个疑问,我们来看一下 commitlog 的加载流程
commitlog 的加载流程大致如下:

我们在这里主要关心 第1,2 个流程。第 3 个流程,在讲解完 ConsumeQueue, IndexFile 会重点说明。
稍微带下第3个流程,第 3 个流程逻辑为,将 commitlog 存入到 consumeQueue。 这里的 commitlog 是未被转发存储到 consumeQueue 中的消息
DefaultMessageStore 初始化
在 rocketMQ 的世界中,我们知道 commitlog 被抽象为 CommitLog 类。每个文件被抽象为 MappedFile。但是有一个很关键的类 DefaultMessageStore 统筹着所有文件的加载。
该类在初始化时,会顺带把 CommitLog 也初始化了
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
// ...
// 初始化 commitLog
if (messageStoreConfig.isEnableDLegerCommitLog()) {
this.commitLog = new DLedgerCommitLog(this);
} else {
this.commitLog = new CommitLog(this);
}
// ...
}
复制代码
DefaultMessageStore 初始化
在 RocketMQ 中,commitlog 被抽象为 CommitLog 类。每个文件被抽象为 MappedFile。DefaultMessageStore 则作为一个 ‘聚合类’, 统筹着所有文件的加载。
该类在初始化时,会顺带把 CommitLog 也初始化了
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
// ...
// 初始化 commitLog
if (messageStoreConfig.isEnableDLegerCommitLog()) {
this.commitLog = new DLedgerCommitLog(this);
} else {
this.commitLog = new CommitLog(this);
}
// ...
}
复制代码
DefaultMessageStore#load() 加载磁盘上的文件
laod() 方法,主要是将磁盘上相关文件, 如 commitlog、consumeQueue、indexFile 加载到内存中.
DefaultMessageStore 在执行 load() 方法时,会委托 CommitLog 将 commitlog 加载到内存中。而 CommitLog#load() 方法,做的处理就是将存储目录下的 commitlog 文件加载到内存
public class DefaultMessageStore implements MessageStore {
public boolean load() {
boolean result = true;
try {
//todo 如果 abort 文件存在,则表示,当前是非正常退出
boolean lastExitOK = !this.isTempFileExist();
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
if (null != scheduleMessageService) {
result = result && this.scheduleMessageService.load();
}
// 委托 CommitLog 加载 Commit Log
result = result && this.commitLog.load();
// load Consume Queue
result = result && this.loadConsumeQueue();
if (result) {
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
this.indexService.load(lastExitOK);
// todo 根据是否正常退出,恢复 CommitLog
this.recover(lastExitOK);
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}
} catch (Exception e) {
log.error("load exception", e);
result = false;
}
if (!result) {
this.allocateMappedFileService.shutdown();
}
return result;
}
}
复制代码
在这个方法中,我们注意到当 abort 文件存在时,broker 会被判定为非正常退出。
并且在通过 CommitLog 将 commitlog 文件加载到内存后。还会执行一个 recover() 对 commitlog 信息进行复原。
为什么还需要对 commitlog 进行恢复?
原因是:commitlog 文件中,存储的消息不一定正确。当 broker 非正常退出时,某条消息可能只存了一半进去。recover() 方法,就是用来将不合法的消息剔除掉。
public class DefaultMessageStore implements MessageStore {
private void recover(final boolean lastExitOK) {
// todo 先恢复 consumeQueue, 获取 consumeQueue 中存储的 commitlog 最大的偏移量
long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
// todo 不正常退出和正常退出的恢复区别为:
// todo 不正常退出,会根据 maxPhyOffsetOfConsumeQueue 找到哪个文件是损坏的,并从该文件开始恢复
// todo 正常退出,则直接从后 3 个文件开始恢复
if (lastExitOK) {
this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
} else {
this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
}
this.recoverTopicQueueTable();
}
}
复制代码























![[桜井宁宁]COS和泉纱雾超可爱写真福利集-一一网](https://www.proyy.com/skycj/data/images/2020-12-13/4d3cf227a85d7e79f5d6b4efb6bde3e8.jpg)

![[桜井宁宁] 爆乳奶牛少女cos写真-一一网](https://www.proyy.com/skycj/data/images/2020-12-13/d40483e126fcf567894e89c65eaca655.jpg)