RocketMQ 之 CommitLog 加载流程

commitlog 加载

当我们把 broker 重启后,RocketMQ 是如何重新将磁盘的 commitlog 文件重新加载到磁盘中的呢?带着这个疑问,我们来看一下 commitlog 的加载流程

commitlog 的加载流程大致如下:

我们在这里主要关心 第1,2 个流程。第 3 个流程,在讲解完 ConsumeQueue, IndexFile 会重点说明。

稍微带下第3个流程,第 3 个流程逻辑为,将 commitlog 存入到 consumeQueue。 这里的 commitlog 是未被转发存储到 consumeQueue 中的消息

DefaultMessageStore 初始化

在 rocketMQ 的世界中,我们知道 commitlog 被抽象为 CommitLog 类。每个文件被抽象为 MappedFile。但是有一个很关键的类 DefaultMessageStore 统筹着所有文件的加载。

该类在初始化时,会顺带把 CommitLog 也初始化了

public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
        final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
        
        // ...
        // 初始化 commitLog
        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            this.commitLog = new DLedgerCommitLog(this);
        } else {
            this.commitLog = new CommitLog(this);
        }
        // ... 
    }
复制代码

DefaultMessageStore 初始化

RocketMQ 中,commitlog 被抽象为 CommitLog 类。每个文件被抽象为 MappedFileDefaultMessageStore 则作为一个 ‘聚合类’, 统筹着所有文件的加载。

该类在初始化时,会顺带把 CommitLog 也初始化了

public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
        final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
        
        // ...
        // 初始化 commitLog
        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            this.commitLog = new DLedgerCommitLog(this);
        } else {
            this.commitLog = new CommitLog(this);
        }
        // ... 
    }
复制代码

DefaultMessageStore#load() 加载磁盘上的文件

laod() 方法,主要是将磁盘上相关文件, 如 commitlog、consumeQueue、indexFile 加载到内存中.

DefaultMessageStore 在执行 load() 方法时,会委托 CommitLogcommitlog 加载到内存中。而 CommitLog#load() 方法,做的处理就是将存储目录下的 commitlog 文件加载到内存

public class DefaultMessageStore implements MessageStore {
    public boolean load() {
        boolean result = true;

        try {
            //todo 如果 abort 文件存在,则表示,当前是非正常退出
            boolean lastExitOK = !this.isTempFileExist();
            log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");

            if (null != scheduleMessageService) {
                result = result && this.scheduleMessageService.load();
            }

            // 委托 CommitLog 加载 Commit Log
            result = result && this.commitLog.load();

            // load Consume Queue
            result = result && this.loadConsumeQueue();

            if (result) {
                this.storeCheckpoint =
                    new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));

                this.indexService.load(lastExitOK);

                // todo 根据是否正常退出,恢复 CommitLog
                this.recover(lastExitOK);

                log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
            }
        } catch (Exception e) {
            log.error("load exception", e);
            result = false;
        }

        if (!result) {
            this.allocateMappedFileService.shutdown();
        }

        return result;
    }
 }
复制代码

在这个方法中,我们注意到当 abort 文件存在时,broker 会被判定为非正常退出。
并且在通过 CommitLogcommitlog 文件加载到内存后。还会执行一个 recover()commitlog 信息进行复原。

为什么还需要对 commitlog 进行恢复?
原因是:commitlog 文件中,存储的消息不一定正确。当 broker 非正常退出时,某条消息可能只存了一半进去。recover() 方法,就是用来将不合法的消息剔除掉。

public class DefaultMessageStore implements MessageStore {
    private void recover(final boolean lastExitOK) {
        // todo 先恢复 consumeQueue, 获取 consumeQueue 中存储的 commitlog 最大的偏移量
        long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();

        // todo 不正常退出和正常退出的恢复区别为:
        // todo 不正常退出,会根据 maxPhyOffsetOfConsumeQueue 找到哪个文件是损坏的,并从该文件开始恢复
        // todo 正常退出,则直接从后 3 个文件开始恢复
        if (lastExitOK) {
            this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
        } else {
            this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
        }

        this.recoverTopicQueueTable();
    }
}
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享