broker
主函数
创建brokercontroller
-
设置系统变量remoting版本号
-
如果没有系统变量定义socket发送缓冲区的大小,则NettySystemConfig.socketSndbufSize设置为128k
-
如果没有系统变量定义socket接收缓冲区的大小,则NettySystemConfig.socketRcvbufSize设置为128k
-
构建命令行参数
- -h help
- -n namesrvAddr
- -c configFile
- -p printConfigItem
- -m printImportantConfig
-
解析命令行参数
-
如果命令行参数中有-h 则在控制台打印命令参数,程序退出
-
构建配置
- new 出四大配置对象BrokerConfig,NettyServerConfig,NettyClientConfig, MessageStoreConfig
- 配置nettyClientConfig是否使用tls
- nettyServerConfig设置监听端口10911
- 如果有-c则从-c指示的位置读出配置文件,将配置文件中的配置项,加载入四大配置类中
- 将命令行参数转化为property,并载入BrokerConfig中
- 如果没有配置rocketmqhome系统参数或者环境变量,则程序退出
- 校验namesrvAddr是否符合ip:port的格式,如果不符合程序退出
- 如果broker的角色是ASYNC_MASTER或者SYNC_MASTER则brokerid设置为0表示是master
- 如果broker的角色是SLAVE,但是brokerid的值小于0则退出程序
- ha的监听端口设置为server监听端口+1
- 日志配置
- 如果命令行参数有-p则在控制台打印四大配置对象,程序退出
- 如果命令行参数有-m,则在控制台打印四大配置的重要属性,程序退出
- 将四大配置对象打印到broker.log中
-
用四大配置对象构建BrokerController对象
-
new 出各个功能模块
- 消费者offset管理
- topic配置管理
- 拉消息处理器
- 拉消息请求保存
- 消息到达通知
- 消费者变更监听
- 消费者管理
- 消费者过滤
- 生产者管理
- 连接管理
- broker反向给client发送请求
- 消费组管理
- 外部api
- 过滤器服务管理
- 从服务器同步
- 统计
- 快速失败
-
-
BrokerController加载
-
如果加载失败,则关闭BrokerController,程序退出
-
topic配置管理器加载数据
- 默认从/${user.home}/store/config/topics.json读出字符串,如果读不到则从topics.json.bak中读取
- 将字符串反json成TopicConfigSerializeWrapper对象
- 将topicConfigTable:
ConcurrentHashMap<String, TopicConfig>(1024);
-
dataVersion : DataVersion();载入内存
- 如果上一步成功,消费者offset管理器加载数据 - 默认从/${user.home}/store/config/consumerOffset.json读出字符串,如果读不到则从consumerOffset.json.bak中读取 - 将字符串反json成ConsumerOffsetManager对象 - 将其中的offsetTable:ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>>载入当前offset管理器中 - 如果上一步成功,则消费组管理器加载数据 - 默认从/${user.home}/store/config/subscriptionGroup.json读出字符串,如果读不到则从subscriptionGroup.json.bak中读取 - 将字符串反json成SubscriptionGroupManager对象 - 把数据载入当前的SubscriptionGroupManager对象中 - 如果上一步成功,则消费者过滤管理器加载数据 - 默认从/${user.home}/store/config/consumerFilter.json读出字符串,如果读不到则从consumerFilter.json.bak中读取 - 将字符串反json成ConsumerFilterManager对象 - 把数据载入当前的ConsumerFilterManager对象中 - 如果上一步成功,则new 默认消息存储类 - 消息到达监听器赋值 - broker配置赋值 - 消息存储配置赋值 - broker状态管理器赋值 - new 一个分配内存映射文件service,并赋值 - new一个提交日志对象,并赋值 - 如果是Dleger模式则new DLedgerCommitLog - 否则则new CommitLog - new一个内存映射文件队列并赋值 - new一个flush提交日志service并赋值 - 如果是同步刷盘则new 组提交service - 如果是异步刷盘则new 实时flush service - new一个实时提交service并赋值 - new 一个追加消息默认回调类并赋值 - new一个保存批量消息解码器的线程局部变量并赋值 - new 一个put消息锁并赋值 - 如果配置使用可重入锁则new 一个可重入锁 - 否则new一个自旋锁,默认new一个自旋锁 - new 一个消费队列table,ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> 并赋值 - new 一个flush消费队列service,并赋值 - new 一个清理提交日志service,并赋值 - new一个清理消费队列service,并赋值 - new一个存储状态service,并赋值 - new 一个索引service,并赋值 - 哈希槽数量赋值,默认为500w - 索引个数赋值,默认为2000w - 索引存储目录赋值,默认为/${user.home}/store/index - 如果不是Dleger模式则new 一个HA service 并赋值 - new一个重放消息service并赋值 - new 一个定时消息service,并赋值 - new一个瞬时存储池,并赋值 - 如果配置了瞬时存储池(默认是关闭的),且当前broker是异步刷盘的主broker,则初始化瞬时存储池 - 默认分配5个堆外内存,每个的大小为1g - 并且调用os的mlock函数对5个堆外内存都进行锁定,以使得这些内存不会到swap区 - 将这些堆外内存收集到ConcurrentLinkedDeque中 - 启动分配内存映射文件service - 启动索引service - new一个提交日志分发器的list - 往list中添加,建立消费队列提交日志分发器 - 往list中添加,建立索引的提交日志分发器 - new 一个RandomAccessFile作为锁文件,并赋值 - 如果是Dleger模式 - new一个broker状态统计类 - new一个消息存储插件上下文 - 用消息存储插件上下文和消息存储类,利用MessageStoreFactory类的build方法构建消息存储并赋值 - 从broker配置中获取消息存储插件,默认是空 - 如果不为空,则用,为分隔符分隔出插件类 - 方向遍历插件类列表,用反射将插件类构建出来,构成一个消息存储的责任链 - 在消息存储的分配器list中加入一个提交日志分发器计算bitmap - 消息存储加载数据 - 查看上次是否安全退出 - 如果/${user.home}/store/abort文件不存在则是安全退出 - 定时消息service加载数据 - 从/${user.home}/store/config/delayOffset.json中读取字符串,如果没读到中delayOffset.json.bak中读 - 将字符串反json成DelayOffsetSerializeWrapper对象 - 载入DelayOffsetSerializeWrapper对象中的offsetTable:ConcurrentMap<Integer /* level */, Long/* offset */> - 构建delayLevelTable:ConcurrentMap<Integer /* level */, Long/* delay timeMillis */>每一个延迟级别对应的延迟毫秒数 - 构建最大延迟级别maxDelayLevel - 提交日志加载数据 - 内存映射文件队列加载数据 - 查看/${user.home}/store/commitlog目录下所有的提交日志文件 - 对文件进行排序,本质是对文件路径进行排序 - 对所有的文件进行遍历 - 对文件大小进行检验 - 如果每个文件的大小不是一个g,则broker停止启动 - 用文件路径和文件大小new一个内存映射文件 - 文件路径赋值 - 文件大小赋值 - new 一个File并赋值 - 提取文件名,作为该文件的起始偏移量 - 确保父目录存在 - 用File和”rw“创建一个随机读写文件RandomAccessFile - 从RandomAccessFile中获取FileChannel,并赋值 - FileChannel调用map函数,把文件映射到内存中,并赋值给MappedByteBuffer - 用于统计的映射到内存的文件总大小加1G - 用于统计的映射内存文件数加一 - 如果以上过程有出现异常,则关闭FileChannel - 内存映射文件的写位置设置1为G - 内存映射文件的Flush位置设置为1G - 内存映射文件的提交位置设置为1G - 将内存映射文件加入CopyOnWriteArrayList<MappedFile> mappedFiles中 - 消费队列加载数据 - 遍历/${user.home}/store/consumequeue/${topic}/${queueId}/下的所有文件 - new 消费队列 - new 一个内存映射文件队列并赋值 - 如果enableConsumeQueueExt 为true,默认为false - new 一个ConsumeQueueExt - 维护consumeQueueTable:ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> - 消息队列加载数据 - 内存映射文件队列加载数据 - 如果开启enableConsumeQueueExt,则consumeQueueExt加载数据 - new 一个存储检查点并赋值 - new一个文件/${user.home}/store/checkpoint - new一个RandomAccessFile并赋值 - 从RandomAccessFile中获取FileChannel并赋值 - 通过FileChannel的map函数将文件映射到内存里面获得MappedByteBuffer,并赋值 - 如果检查点文件存在 - 从中分别获取三个long,分别为提交日志,消息队列,消息索引的三个时间戳 - 日志打印这三个时间戳 - 索引service加载数据 - 对索引目录下所有文件进行排序 - 遍历索引存储目录下的所有文件 - 对每一个文件,new 一个索引文件对象 IndexFile,与之相对应 - 计算文件总大小并赋值=40+500w*4+2000w*20 - new 一个内存映射文件对象MappedFile - 获取内存映射文件中的FileChannel并赋值 - 获取内存映射文件中的内存mappedByteBuffer并赋值 - 最大hash槽数赋值 - 最大索引数赋值 - new一个索引文件Header并赋值,IndexHeader - 索引文件加载数据 - 索引文件Header加载从文件中加载数据 - 第一个索引消息落在Broker的时间戳 - 最后一个索引消息落在Broker的时间戳; - 第一个索引消息在commitlog的偏移量; - 最后一个索引消息在commitlog的偏移量; - 构建索引占用的槽位数; - 构建的索引个数; - 如果索引个数都是0,则设置为1 - 如果没有上次没有安全退出,也就是说abort文件存在,且最后一个索引消息的时间戳大于abort文件中的索引消息时间戳,则摧毁索引文件 - 摧毁内存映射文件 - 使用引用计数减一 - 如果引用计数小于等于0则做清理 - 找到终极的bytebuffer - 解除内存映射 - 关闭文件描述符 - 总的映射到内存的文件的大小减去响应的文件大小 - 总映射文件数减一 - 关闭文件渠道FileChannel - 删除文件 - 把索引文件对象加入indexFileList中 - 恢复数据 - 恢复消息队列,返回提交日志当前提交offset - 遍历consumeQueueTable获取所有的消息队列 - 对每个消息队列调用recover - 计算消息队列中最大的提交日志偏移量maxPhysicOffset,表示这个消息队列消息最大存储到maxPhysicOffset-1这个位置 - 计算消息队列的flush位置flushedWhere,也就是说flush-1的位置都flush了,下次flush从flushedWhere开始 - 计算提交位置committedWhere,也就是说下次提交到消息队列的数据从committedWhere开始写 - 摧毁脏文件 - 找到当前提交位置对应的文件,对这个文件设置wrotePosition,committedPosition,flushedPosition - 删掉排序在当前写文件后面的文件 - 摧毁内存映射文件 - 内存映射文件list中删去对应的文件 - 如果有扩展消息队列 - 扩展消息队列恢复数据 - 扩展消息队列摧毁 - 恢复提交日志 - 如果上次正常退出 - 如果commitlog文件被删除 - FlushedWhere设置为0 - CommittedWhere设置为0 - 摧毁消息队列 - commitlog文件存在 - 加载后三个提交日志文件 - 设置flushWhere - 设置commitedWhere - 删除脏文件 - 如果消费队列的最大物理偏移量比加载出来的提交日志的当前偏移量大,则对消息队列进行截断 - 如果不正常退出 - 寻找开始加载的提交日志文件 - 从最后一个文件开始遍历,找到第一个消息的store存储时间小于检查点的时间 - 设置flushWhere - 设置commitedWhere - 删除脏文件 - 如果消费队列的最大物理偏移量比加载出来的提交日志的当前偏移量大,则对消息队列进行截断 - 提交日志恢复HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable - 获取提交日志的最小偏移量 - 遍历consumeQueueTable:ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> - key = logic.getTopic() + "-" + logic.getQueueId() - value = 消息队列的最大偏移量 - 用日志日志的最小偏移量纠正消息队列最小偏移量 - 获取第一个文件 - 开始按20个字节为单位读取文件 - 如果physical偏移量大于等于提交日志的最小偏移量,则修正消息队列的最小偏移量,该偏移量指的是消息队列文件本身的偏移量 - ext消息队列有特殊处理 - 如果上诉步骤有失败,则关闭分配内存映射文件service - 如果上述步骤有成功则new 一个NettyRemotingServer并赋值 - oneway信号量赋值默认256,异步信号量赋值默认64 - new一个ServerBootstrap并赋值 - netty服务端配置赋值 - ClientHousekeepingService赋值 - new一个公共线程池并赋值 - eventLoopGroupBoss和eventLoopGroupSelector赋值 - 是否使用epollo - 要是linux操作系统 - 且epollo开关打开,默认是关闭的 - 且调用netty的Epoll.isAvailable()返回true - 使用epollo则 - new一个单线程的EpollEventLoopGroup并给eventLoopGroupBoss赋值 - new一个3个线程的EpollEventLoopGroup并给eventLoopGroupSelector赋值 - 不是epollo则 - new一个单线程的NioEventLoopGroup并给eventLoopGroupBoss赋值 - new一个3个线程的NioEventLoopGroup并给eventLoopGroupSelector赋值 - 加载ssl上下文 - 如果tls模式不是TlsMode.DISABLED则加载ssl上下文 - 监听端口减2 再new 一个NettyRemotingServer并给fastRemotingServer赋值 - new一个发送消息的线程池并赋值 - new一个拉消息的线程池并赋值 - new一个查询消息的线程池并赋值 - new一个admin 线程池并赋值 - new一个客户端管理线程池并赋值 - new一个心跳线程池并赋值 - new一个结束事务线程池并赋值 - new一个消费者管理线程池并赋值 - 注册处理器 - new一个发送消息处理器 - remotingServer和fastRemotingServer 遇到SEND_MESSAGE,SEND_MESSAGE_V2,SEND_BATCH_MESSAGE,CONSUMER_SEND_MSG_BACK都用发送消息处理器处理,处理线程用发送消息执行器 - remotingServer注册RequestCode.PULL_MESSAGE,拉消息处理器和拉消息线程池 - new一个消息查询处理器 - remotingServer和fastRemotingServer都注册RequestCode.QUERY_MESSAGE和RequestCode.VIEW_MESSAGE_BY_ID,消息查询处理器和消息查询线程池 - new一个客户端管理处理器 - remotingServer,fastRemotingServer注册RequestCode.HEART_BEAT,RequestCode.UNREGISTER_CLIENT,RequestCode.CHECK_CLIENT_CONFIG,用客户端管理处理器和客户端管理线程池 - new一个消费者管理处理器 - remotingServer和fastRemotingServer注册RequestCode.GET_CONSUMER_LIST_BY_GROUP,RequestCode.UPDATE_CONSUMER_OFFSET,RequestCode.QUERY_CONSUMER_OFFSET用消费者管理处理器和消费者管理线程池 - new一个结束事务线程池,remotingServer和fastRemotingServer注册RequestCode.END_TRANSACTION用结束事务处理器和结束事务线程池 - new一个admin处理器,remotingServer和fastRemotingServer将之注册为默认处理器 - broker状态管理器的记录方法,每天调一次 - 消费者offset持久化,每5s调用一次 - 消费者过滤管理器持久化,每10s执行一次 - brokerController的保护broker方法每3分钟执行一次 - brokerController的打印水位方法每分钟执行异常,打印发送消息线程池、拉消息线程池、查询消息线程池和结束事务线程池的任务队列的个数,以及任务队列中头节点的创建时间和当前时间的差值 - 分配落后的消息,每分钟执行一次 - 如果配置ns则brokerOuterAPI直接赋值ns否则2分钟获取一次ns地址 - 如果不是Dleger模式则 - 如果该broker是从 - 如果有配置ha主服务器地址,则赋值 - 周期性更新ha主服务器地址开关,赋值 - 如果该broker是主 - 打印主和从的差异,每分钟执行一次 - 如果允许tls - new一个文件监视service, - 初始化事务 - new 一个事务消息service并赋值 - 如果META-INF/service/org.apache.rocketmq.broker.transaction.TransactionalMessageService文件存在,则用里面的类new一个实例 - 否则 new一个TransactionalMessageServiceImpl - new一个事务消息监测监听器并赋值 - 如果META-INF/service/org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener存在,则里面的类new一个实例 - new一个DefaultTransactionalMessageCheckListener - new一个事务消息监测service并赋值 - 初始化acl - 如果acl开关不打开则直接返回,默认是不打开的 - 如果META-INF/service/org.apache.rocketmq.acl.AccessValidator文件存在则对里面的所有类new出实例 - 维护访问校验器map,accessValidatorMap - 将访问检验器注册进rpc 钩子中 - 初始化rpc钩子 - 如果META-INF/service/org.apache.rocketmq.remoting.RPCHook存在则注册其中的rpc钩子 复制代码
-
-
给java进程加上关闭钩子,进程退出的时候会调用
- 关闭brokercontroller
启动brokercontroller
-
启动brokerController
-
启动消息存储对象messageStore
-
获取锁文件的文件锁
-
如果获取不到文件锁则抛出异常,表示当前mq已经启动
-
将“lock”写入锁文件
-
flush进磁盘
-
获取所有的消息队列在commitlog的最大偏移量maxPhysicalPosInLogicQueue
-
如果maxPhysicalPosInLogicQueue = -1 则设置为0
-
如果maxPhysicalPosInLogicQueue 小于commitlog的最小偏移量则maxPhysicalPosInLogicQueue设置为commitlog最小偏移量
-
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
-
this.reputMessageService.start();
-
进入等待直到消息队列和消息索引不在落后
-
加载TopicQueueTable
-
如果不是Dleger模式则haService启动
-
如果不是Dleger模式,则从broker关闭定时消息service否则启动定时消息service
-
flushConsumeQueueService启动
-
commitLog启动
-
Dledger模式
- 启动commitlog刷盘service,
- 如果开启了isTransientStorePoolEnable,则启动CommitRealTimeService
-
非Dledger模式
-
-
storeStatsService启动
-
创建临时文件
-
添加定时任务
-
-
启动remotingServer和启动fastRemotingServer
-
new一个DefaultEventExecutorGroup并赋值,线程池数默认是8
-
准备共享的处理器
- new一个握手处理器HandshakeHandler并赋值
- new一个编码器并赋值NettyEncoder
- new一个netty连接管理处理器并赋值NettyConnectManageHandler
- new一个服务端处理器并赋值NettyServerHandler
-
配置ServerBootstrap
-
设置eventLoopGroupBoss,eventLoopGroupSelector
-
设置渠道
- 如果使用epoll则EpollServerSocketChannel.class
- 否则NioServerSocketChannel.class
-
设置SO_BACKLOG=1024
-
设置端口复用为true,SO_REUSEADDR
-
关闭SO_KEEPALIVE
-
TCP_NODELAY为true
-
设置接收缓冲区和发送缓冲区为128k
-
设置监听ip和端口,监听ip为通配符,端口默认为8888
-
加处理器
- handshakeHandler
- encoder
- NettyDecoder
- IdleStateHandler
- connectionManageHandler
- serverHandler
-
配置使用ByteBuf内存池
-
-
进行端口绑定
-
启动NettyEventExecutor
-
删除过期的responseFuture,每分钟执行一次
-
-
启动fileWatchService
-
启动brokerOuterAPI
-
启动pullRequestHoldService
-
启动clientHousekeepingService
-
启动filterServerManager
-
如果不是Dleger模式
-
如果是主则启动事务消息检测service
-
如果是从
-
同步方法每10s执行一次
- 同步topic配置
- 同步消费者offset
- 同步delay offset
- 同步订阅组配置
-
-
-
注册broker
-
注册broker,每30s执行一次
-
启动broker状态管理器
-
启动broker快速失败
-
-
在broker.log中打印日志
消息存储模块
发送消息处理器
- RequestCode.SEND_MESSAGE
- RequestCode.SEND_MESSAGE_V2
- RequestCode.SEND_BATCH_MESSAGE
如果是RequestCode.CONSUMER_SEND_MSG_BACK:
-
获取请求头ConsumerSendMsgBackRequestHeader
-
执行消费钩子的executeConsumeMessageHookAfter方法
-
获取消费组配置对象,如果没有,但是自动创建的开关打开,则创建一个消费组配置对象
-
进行校验
- 是否有消费组配置对象
- 是否有权限
- 重试队列的个数是否大于0
-
生成新的topic的名字和路由队列编号
- 新的topic=%RETRY%+consumeGroup
- 路由队列重试topic默认只有一个重试队列因此也不需要路由
-
创建topic配置对象,并且往topic配置管理器中注册,持久化,往ns同步
-
根据commitlog的偏移量取出消息
-
之后开始判断要将这个消息重新放到私信队列还是重试队列中,延迟级别如果小于0则表示这个消息直接进入死信队列,如果延迟级别=0则延迟级别由服务端控制,如果大于0则是客户端控制。如果客户端版本为V3_4_9或以上则最重试次数由客户端控制,否则由服务端控制。如果延迟级别小于0或者重试次数达到最大次数,则是要进入死信队列的,对于一个消息组来说在一个broker默认是只有一个重试队列和一个私信队列。做完判断之后会将这个消息重新投到重试队列或者重试队列中,消息的topic和queue对被改掉,但是会将原来的topic和偏移量保存在新消息的property中,这个新的消息的重试次数会比原来加1
-
消息的重试次数属性在服务端加一
-
异步刷盘
-
消息会这样流转,首先是对某个topic的消息进行消费,消费失败了,则会将这个消息发回给broker,broker会将这个消息写入重试队列中,然后会在这个消息的property中保存原始消息的topic和offset并且让重试次数加1,之后消息者消费失败在将这个消息发回broker,broker会将这个消息在写入重试队列,并且重试此时在加1,直到重试次数达到最大,这个消息则会被放入死信队列中,不再进行消费。默认的最大重试次数为16,所以最多会进行17次的消费。
内存映射文件生成器,给commitlog用
- 是一个独立线程
- 外部线程提交请求,独立线程提取请求生成文件,外部线程利用countdownlatch做等待,独立线程执行完调用countdown
- 内存预热,内存锁定,内存建议,刷盘
- 关闭的时候,摧毁生成超时的文件
刷盘,针对commitlog说的,是一个独立线程
-
GroupCommitService同步刷盘
- 等刷盘了在返回isWaitStoreMsgOK=true, 会提交一个请求给独立线程,然后唤醒独立线程,然后利用countdownlatch做同步等待,等待刷盘
- 不等刷盘了直接返回,会唤醒独立线程
- 一个独立线程来完成刷盘,每10毫秒运行一次,运行的时候如果请求队列里有刷盘请求,则一个个取出来进行执行,执行的过程如果flushwhere已经大于要请求要刷的位置,则不刷了,请求执行完要调用请求里的countdownlatch的countdown,唤醒正在等待结果的线程,如果请求队列没有则直接做刷盘
-
异步刷盘
-
如果只是配置异步刷盘则是FlushRealTimeService,如果还配置TransientStorePoolEnable且此时是主broker则不仅仅会启动FlushRealTimeService还会启动CommitRealTimeService
-
只启动FlushRealTimeService,则和同步刷屏差异不大
- 主线程唤醒FlushRealTimeService后直接返回 - 配置:实时刷盘or定时刷盘,默认实时刷盘,如果是实时刷盘则写入commitlog之后会立马唤醒线程进行刷盘,定时刷盘使用的是sleep,因此主线程的唤醒不起作用 - 配置:每个几秒刷盘一次,默认500毫秒 - 配置:每次刷盘至少刷几个页,默认是4个页 - 配置:多久彻底刷盘一次10s 复制代码
-
启动FlushRealTimeService和CommitRealTimeService
- 主线程唤醒CommitRealTimeService后直接返回
- 在这个模式下,是先把消息写到直接内存中,之后会由CommitRealTimeService这个独立线程将直接内存中新写的消息写入到FileChannel中,因此会有一个commitWhere用于标记直接内存写入FileChannel的边界,最后会唤醒FlushRealTimeService,对FileChannel进行刷盘
- 在直接内存已经全部提交给FileChannel则此时该直接内存已经无用,需要归还给transientStorePool
-
-
消息从commitlog往消费队列和索引中的分发
-
是有一个独立线程来完成ReputMessageService
-
每隔一毫秒执行一次
-
会将没有还分发到消费队列和索引中的消息,分发过去,知道当前没有需要搬运的消息
-
读到一个消息,那调用不同的消费分发器将消息进行分发
-
如果是主broker,且支持长轮训(默认支持),则还需要调用消息到达监听的到达接口,通知消息现在已经达到
-
commitlog 消息的分发器
-
CommitLogDispatcherBuildConsumeQueue,会将消息分发给消息队列
- 消息队列虽然有调用了内存映射函数,但是并没有使用,写用的是FileChannel的write,而且没有flush盘
-
CommitLogDispatcherBuildIndex,用于将消息分发给索引
- 索引有使用内存映射
- 当创建新的索引文件的时候,会new一个异步线程对前一个索引文件进行刷盘
- 按索引文件的格式进行存储
- 一个消息存储多次,按uni_key和keys的每一个key进行存储
- 初始的时候索引个数是1,因此第一个索引实际上存到索引区的第二个位置
-
消费队列的刷盘
- 是一个独立线程
- 刷盘间隔,默认为1s
- 每次至少刷几个页默认为2,也就是说当要刷的数据少于2个页不刷
- 多长时间即使不到两个页也要刷盘,默认为1min,也就是说1min要刷盘一次,即使要刷的数据不到2页
文件的清理
-
是一个定时任务
-
默认是一分钟执行一次
-
(1)清理commitlog
-
删除过期的文件
-
文件保留时间,默认为3天
-
删除commitlog文件的间隔,默认为100ms
-
强制摧毁内存映射文件的时间间隔,默认为2分钟
-
删除的时间,默认为4点
-
磁盘空间上是否到了需要删除的临界点
- 磁盘最大使用率 默认为75%
- 磁盘报警阈值 默认为90%
- 磁盘空间强制清理阈值 默认为85%
- 如果磁盘空间使用量超多了报警阈值,则需要将磁盘已满位置位,同时打开立即清理开关,返回到达临界点
- 如果大于磁盘空间强制清理阈值,则打开立即清理开关,返回到达临界点
- 如果大于磁盘最大使用率,不打开立即清理开关,但返回达到临界点
-
时间到或者磁盘到达临界点,则执行删除操作
- 批量删除文件数阈值,默认为10个
- 获取所有的内存映射文件,除了最后一个
- 立即清理开关打开或者文件已经3天没有更新则摧毁内存映射文件,强制摧毁的时间是2分钟
- sleep 100ms再遍历下一个文件
-
-
再次删除挂起的文件
-
所谓挂起的文件只的是available应设置为false,但是引用数还不为0的文件
-
删除挂起的文件的时间间隔,默认为2分钟
-
如果2分钟时间到则执行操作
-
获取第一个文件,如果他是挂起的文件,则将它摧毁
- 如果距离关闭的时间已经过去2分钟,则强制摧毁,即使还有引用数
-
-
-
-
(2)清理消费队列和索引
-
消费队列和索引的删除是依赖于commitlog的删除的,commitlog删除了哪些,那么消费队列和索引要跟着删除
-
删除消费队列文件的时间间隔,默认为100ms
-
遍历所有的消费队列,进行删除,入参是commitlog的最小偏移量,每隔100ms进行下一个消费队列的删除
- 按顺序遍历的内存映射文件,但排除最后一个文件,如果文件的最后一个单元的commitlog偏移量比commitlog的消息偏移量小,则摧毁,强制摧毁的时间为1分钟
-
纠正消费队列的最小偏移量
-
完成对消费队列删除之后,对索引进行删除,入参是commitlog的最小偏移量
- 按顺序遍历索引文件,出了最后一个,如果最后一个索引的commitlog偏移量比当前最小的commitlog的偏移量小则要删除这个索引文件
- 强制删除摧毁索引文件的时间间隔是3s
-
定时消息service
- 数据加载
- 启动
- 说明:写commitlog的时候有一个处理如果消息体有设置延迟级别,那么会有一个处理,会将topic改掉改成SCHEDULE_TOPIC_XXXX,queueid改成延迟级别-1,并且将原来的topic和queueid保存起来。之后就和正常消息一样去写commitlog和消费队列还有索引。定时消息service的作用就是去将这些消息取出,恢复原来的topic和queueid,再写入commitlog,消费队列,索引。
- 是一个独立的单线程
- 非常重要:在将commitlog的消息往消费队列的分发的过程中,有一个特殊处理,那就是如果发现这个消息是一个定时消息,那么则这个时候tagsCode会被置成一个时间戳,这个时间戳到了表示这个消息应该被投递到原来topic上了,把它称作投递时间戳
- 维护一个map,保存18个延迟队列哪些消息以及被投递到原来的topic上,这个map,每10s中执行一次
- 核心逻辑:遍历每一个延迟队列,遍历的过程中将时间到的消息都投递到原来的topic中,一直遍历到第一个时间没到消息,之后再提交一个延迟任务,延迟时间是投递时间和当前时间的时间差,如果所有队列中的所有消息都投递了,那么则提交一个延迟任务,延迟时间是100ms
消息拉取模块
消息拉取请求处理器
-
消息拉取请求的处理
-
首先关于消息的拉取请求的处理线程池的大小为16 + Runtime.getRuntime().availableProcessors() * 2
-
一系列的校验
- 这个broker是否可读
- 订阅组配置是否存在
- 订阅组配置中是否允许消费开关是否开启
- topic配置是否存在
- topic配置中是否有可读权限
- 请求的队列id是否过大
-
客户端上传的几个开关
- 是否可以挂起,客户端默认为可以挂起
- 是否向broker提交offset,默认提交
- 是否每次构建订阅详情,默认为否
-
去读取消息
-
一些限制性的配置
-
accessMessageInMemoryMaxRatio
- 默认为40%
- 如果要获取的消息的起始位置到终止位置的大小大于40%,则认为这个消息是在磁盘中,否则认为是在内存中
-
maxTransferBytesOnMessageInDisk
- 默认为64k
- 当消息在磁盘中,那么这个时候客户端可以拉取到的消息s最大为64k
-
maxTransferCountOnMessageInDisk
- 默认为8
- 当消息在磁盘中的时候,那么这个时候客户端可以拉取的消息的最大个数是8
-
maxTransferBytesOnMessageInMemory
- 默认为256k
- 如果消息在内存中那么这个时候客户端可以拉取到的消息s最大为256k
-
maxTransferCountOnMessageInMemory
- 默认为32
- 如果消息在内存中那么这个时候客户端可以拉取到的消息的最大个数是32
-
-
以上配置读取的第一个消息不起作用
-
一直读取客户端“想要的消息”,直到消息个数已经到达客户端的想要的个数或者达到以上配置的临界或者已经读到消息队列一个文件的尾部了或者已经在消息队列读了16000个字节了
- 如果只是tag过滤的话,则只在消息队列端用tag的hashcode进行过滤不会在commitlog端再次过滤
-
还有一个特殊的情况,如果要从commitlog中去读去消息的过程中发现消息以及被删除了,那么这个时候是这么处理的:首先计算commitlog下一个文件的起始位置,之后读消费队列的时候,会过滤offset小于该起始位置的,也即去掉已经被删除的。
-
-
要给客户端返回下一次要读取消费队列的起始位置,这个起始位置是/20的
-
在拉完消息之后,如果该消费者当前还未拉取的消息的总大小如果大于物理内存的40%,那么这个时候服务端会建议客户端从slave中拉取消息
-
给客户端返回commitlog的最大偏移量和最小偏移量
-
有两种向客户端写的方式,一种是普通的写,这一种是使用零拷贝技术进行写,默认是前者
-
客户端请求中向broker提交offset开关打开,则向消费者offset管理器中提交offset
-
拉消息的挂起
- 如果没有拉取到消息,且客户端是允许挂起(写死是挂起),挂起的时间看,broker支持长轮训的开关是否打开,如果打开则挂起时间由客户端控制挂起时间是15s,否则挂起时间是1s
- 将拉取请求放到拉取请求保管service中
-
offset移动事件
-
拉消息请求保管service
- 是一个独立的线程,保管了挂起的拉取请求
- 如果broker的长轮训开关打开则5s,运行一次,否则就是1s轮训一次
- 执行的操作:获取队列当前最大的offset,之后遍历所有的拉取请求,如果这些拉取请求的要求的offset小于最大offset或者挂起时间到了,则将这些请求重新放到拉取请求处理器中执行
消息到达监听器
- 在reput的过程中,在往消费队列dispatch完消息之后,这个时候消息已经达到了,如果是主且当前是长轮训的话,这个时候就会回调消息到达监听器,消息到达监听去就会主动去执行拉取请求保管service的操作,只是这个时间的操作和多一步消息的过滤,之后当前这个消息符合拉取请求的过滤条件,这个时候拉取请求才会重新执行
配置管理模块
topic配置管理
- topic配置对象,记录了在这个broker中topic的队列数有多少,是不是顺序的topic
- 会创建一些系统级别的topic配置,其中当broker自动创建topic的开关开启,则会创建topic名称为TBW102的topic配置对象
- 不会有一个定时任务去定时持久化,因为这个东西不怎么会变化,所有不需要这样,只要在接收创建或者修改请求的时候,去持久化一下就好了
- 当接收消息的时候,发现消息的topic不存在的时候,那么这个时候会利用TBW102这个topic的配置去创建新的topic对应的topic配置对象,然后做持久化,最后会向ns去注册当前broker的topic配置对象信息
订阅组配置管理
- 用于存储消费组的一些配置信息
- 如果没有订阅组配置,broker的自动创建订阅组开关是打开的(默认打开),则会创建响应的订阅组配置,否则向客户端返回错误
- 不会定期进行持久化,当被调用修改配置的时候,会进行持久化
客户端管理模块
客户端管理处理器
-
处理心跳请求
- 客户端发布的心跳信息包括客户端的id(ip+pid),客户端各生产组的groupName,客户端各消费组的信息(包括,消费组名,消费的topic,消费方式,订阅数据)
- 遍历消费组,生成相应的订阅组配置对象和重试topic(一个消费组会有一个重试topic),往消费者管理器中注册消费者
- 遍历生产组,往生产者管理器中注册生产者
-
处理客户端注销请求
-
这个请求可注销生产者和消费者,就看请求体有生产组和消费组有没有赋值,如果有赋值就注销
-
注销生产者,
- 调用生产者管理器将对应的渠道信息删掉
-
注销消费者
- 调用消费者管理器将对应的渠道信息删除,并且通知消费者id变更监听器有变更事件
-
-
检查消费者是否在broker中
- 先不看和sql过滤有关系
消费者管理器
- 维护了一个map,key为消费组,value为这个消费组的信息ConsumerGroupInfo,ConsumerGroupInfo里面的具体信息包括,这个消费组所订阅的topic和对这个topic的订阅关系,这个消费组下都有哪些消费者,记录了这些消费者的渠道信息
- 在处理心跳的过程会往消费者管理器中注册消费者,注册的过程中,如果发现消费组下的消费者的渠道增加了或者订阅信息变更了,那么这个时候,会回调消费者id变更监听器
- 不定期进行持久化
消费者id变更监听器
-
是一个监听器,消费者管理器中的某个消费组信息发生了变更,那么它的监听方法会被调
-
总共监听三个事件,变更,注册,取消注册
-
变更事件的处理
- 如果订阅组配置中的通知消费者id变更开关打开,且broker中的通知消费者id变更开关打开,默认都是打开的,则会遍历所有的渠道,通过这个些渠道给消费者发送消息,告知这些消费者,现在消费者的id变更了
-
注册事件的处理
- 和消费的sql过滤有关,暂时不看
-
取消注册事件的处理
- 和消费的sql过滤有关,暂时不看
生产者管理器
- 维护了一个map,这个map的key为组名,value为这个生产组下具体的一个个生产者的渠道信息
- 不定期进行持久化
消费者管理处理器
-
获取消费组下消费者列表请求
- 返回一个消费组下所有的消费者的客户端id
-
更新消费者offset
-
获取消费者offset
消费者offset管理
-
消费者offset管理器
- 维护了一个map,这个map记录了某个消费组对某个topic的某个队列的消费offset,表示该消费组队某个队列消费到哪个位置了,这个map定期会持久化到磁盘中
-
消费者管理处理器
- 负责处理消费者查询和更新消费者offset管理器中的map的请求
消费查询模块
根据key查询消息
- 首先去索引中查询出key的hash值相同的消息的offset,之后去commitlog中将相应的消息取出,之后利用内存映射文件做零拷贝方式的写响应,FileRegion
- 这个地方有先将消息读到内存中
- 这个key可以是uni_key也可以是消息中的设置keys
根据commitlog的offset查询消息
- 直接根据offset去commitlog中将消息取出利用FileRegion做零拷贝写响应
- 这个地方没有先将消息读到内存中
事务消息模块
事务消息service
-
写事务消息,将原topic和queueid存到property中,然后将topic改为RMQ_SYS_TRANS_HALF_TOPIC,queueid改为0,之后当成正常消息存储
-
service的check函数
- 遍历half消息,看是否存在对应的op消息,如果不存在则调用事务消息监听器的resolveHalfMsg,如果半消息check次数已经达到15次,则调用事务消息监听器的resolveDiscardMsg
结束事务处理器
-
提交
- 读出半消息,恢复原来的topic和queueid,之后进行消息的存储
- 写一条op消息,消息体为半消息的队列偏移量
-
回滚
- 写一条op消息,消息体为半消息的队列偏移量
事务消息检验service
- 是一个独立线程,定时任务默认每60s执行一次,事务消息service的check函数
事务消息检验监听器
-
resolveHalfMsg
- 反向给生产者发消息查询当前事务的状态
-
resolveDiscardMsg
- 把半消息存到TRANS_CHECK_MAX_TIME_TOPIC这个topic中