client
DefaultMQProducer
new
-
this.namespace = namespace;
-
this.producerGroup = producerGroup;
-
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
-
this.defaultMQProducer = defaultMQProducer;
-
this.rpcHook = rpcHook;
-
构建异步发送消息的线程池
-
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue(50000);
-
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);@Override public Thread newThread(Runnable r) { return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet()); } 复制代码
});
-
-
start()
-
defaultMQProducerImpl.start();
-
检查配置
-
检查生产组,不符合则抛出异常
- 检验非空
- 必须由 %|a-zA-Z0-9_-所组成
- 不能为DEFAULT_PRODUCER
-
检查生产组,检验非空
-
检查生产组,不能为DEFAULT_PRODUCER
-
-
实例名如果是DEFAULT,则将实例名改成pid
-
获取或者创建客户端工厂
-
从基本客户端配置中构建clientid = ip + pid
-
以clientid为key从factoryTable判断客户工厂是否存在,存在不做处理,不存在则new一个出来
-
new 客户端工厂
-
赋值:客户端公共配置类
-
赋值:客户端工厂索引
-
new netty客户端配置类
-
netty客户端配置类设置,客户端回调线程池线程数
-
netty客户端配置类设置,是否使用tls
-
new 客户端远程处理器ClientRemotingProcessor
-
new 客户端api实现者 MQClientAPIImpl
-
clientConfig赋值
-
topAddressing赋值
-
new NettyRemotingClient
- 构建oneway信号量 异步信号量 默认是65535个permit, 公平
- nettyClientConfig赋值
- channelEventListener赋值为null
- publicExecutor赋值 = 一个固定线程池 大小为nettyClientConfig.getClientCallbackExecutorThreads()默认是4个, 线程名称前缀是NettyClientPublicExecutor_
- eventLoopGroupWorker赋值 构建一个单线程的NioEventLoopGroup,线程名称前缀是NettyClientSelector
- 如果使用tls则构建sslContext
-
clientRemotingProcessor赋值
-
remotingClient注册rpc钩子
-
将clientRemotingProcessor注册进remotingClient中,所有的请求都由该processor处理
- 维护remotingClient中的processorTable,HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>>其中线程池用publicExecutor
-
-
NettyRemotingClient的namesrvAddrList赋值
-
赋值:客户端id
-
new MQ管理实现类MQAdminImpl
-
new 拉消息service PullMessageService
-
new 在平衡service RebalanceService
-
以CLIENT_INNER_PRODUCER为生产组 new一个DefaultMQProducer
-
new 一个消息状态管理者 ConsumerStatsManager
-
-
-
-
往客户端工厂注册生产者
-
维护topicPublishInfoTable
-
启动客户端工厂
-
系统变量 -> 环境变量 -> jmenv.taobao.net 依次获取nameserver的地址
-
启动客户端api实现者mQClientAPIImpl
-
启动NettyRemotingClient
-
构建一个DefaultEventExecutorGroup,线程数是为nettyClientConfig.getClientWorkerThreads(),默认为4个,线程名称前缀为NettyClientWorkerThread_
-
正式启动nio客户端
-
设置selector线程组eventLoopGroupWorker
-
设置tcp参数
- .option(ChannelOption.TCP_NODELAY, true)
- .option(ChannelOption.SO_KEEPALIVE, false)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) 默认3s
- .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) 默认65535
- option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()) 默认65535
-
设置ChannelInitializer
-
如果有ssl则在加入ssl的渠道处理器,在woker线程中进行处理pipeline.addFirst(defaultEventExecutorGroup, “sslHandler”, sslContext.newHandler(ch.alloc()));
-
加入编码处理器NettyEncoder
-
将RemotingCommand序列化写入netty的ByteBuf
- RemotingCommand表示对一个rocketmq的报文的封装
-
如果失败则关闭连接
-
-
加入解码处理器NettyDecoder
-
NettyDecoder继承自netty 的LengthFieldBasedFrameDecoder
-
NettyDecoder调用父类LengthFieldBasedFrameDecoder的构造函数
- 给maxFrameLength辅助,表示一个帧的最大长度,默认为16777216
- lengthFieldOffset=0,lengthFieldOffset表示消息长度域的起始位置
- lengthFieldLength=4,表示的是长度域的长度
- lengthAdjustment=0,表示在消息长度的基础上在加上lengthAdjustment,为消息的长度
- netty用在长度域后在读取长度域+lengthAdjustment 个字节
- initialBytesToStrip=4,表示netty会将获取到的帧,去掉前4个字节
-
调用父类的解码方法将帧取到(ByteBuf) super.decode(ctx, in)
-
获取从ByteBuf获取java的ByteBuffer,在将其解码成RemotingCommand对象
-
-
加入空闲状态处理器IdleStateHandler
- 如果超过120s没有读写操作,则发出IdleStateEvent事件,由NettyConnectManageHandler关闭连接
-
加入连接管理处理器NettyConnectManageHandler
- 负责连接的管理
- 关闭连接功能
-
加入客户端处理器NettyClientHandler
-
对读到达报文RemotingCommand处理
-
根据RemotingCommand的第一个位判断是请求还是相应
-
如果是请求对请求进行处理
-
根据请求码从processorTable,取出处理该请求的处理器,和处理这个请求所用的线程池,在client中所有的请求由ClientRemotingProcessor处理, 处理线程池是publicExecutor
-
如果会这个请求码不存在相应的处理器,则构建一个响应报文,表示该请求类型不支持
-
调用处理器的rejectRequest方法,如果返回true,则构建一个响应报文表示现在系统繁忙,异步写回去
-
构建一个runnable提交给线程池去执行
-
runnable的内容
- 执行钩子方法doBeforeRequest方法
- 调用处理器的processRequest对请求进行处理
- 执行钩子方法doAfterResponse
- 如果不是oneway的请求,则将处理器放回的响应报文写回到对端
- 如果处理器处理的过程中抛出异常,则构建一个响应报文,表示系统异常,并附上异常类名称和第一层的栈信息,异步写回去
-
-
如果线程池采用拒绝策略,则如果不是oneway请求,则创建一个响应报文表示当前系统繁忙,异步写回去
-
-
如果是响应对响应进行处理
-
从响应RemotingCommand中获取opaque,opaque在一个进程中唯一,用于标识一个进程中的一个请求
-
用opaque从responseTable获取ResponseFuture
-
如果ResponseFuture为空,则打印日志,收到一个响应报文,但是当前没有请求与之对应
-
如果ResponseFuture不为空
-
将响应报文放入responseFuture.setResponseCommand(cmd);
-
responseTable删除该responseFuture
-
依照是同步调用还是异步调用进行处理
-
如果是异步调用则ResponseFuture会有一个回调,则调用该回调
-
如果nettyRemotingClient的callbackExecutor为空则在publicExecutor中进行回调
-
并且释放信号量
- 使得异步请求的窗口变大
-
-
如果是同步调用则没有回调
- 则countdown countdownlatch,使同步等待的线程通过
- responseFuture中的信号量释放
-
-
-
-
-
-
-
-
-
启动定时任务scanResponseTable,对异步调用进行超时处理, 每秒执行一次
-
遍历responseTable中的异步调用
-
如果已经超时
- 释放信号量,在responseTable中将其删除
- 执行回调
-
-
如果channelEventListener不为null,则启动nettyEventExecutor
-
-
-
启动各种定时任务
-
2分钟执行一次fetchNameServerAddr
-
30s 执行一次updateTopicRouteInfoFromNameServer
- 从consumerTable获取所有的消费者的topic
- 从producerTable获取所有的生产者的topic
- 遍历所有的topic调用updateTopicRouteInfoFromNameServer(topic)
-
30s 执行一次和broker的心跳
-
清除下线的broker
- 用可重入锁控制保证以下的操作是串行的
- 遍历brokerAddrTable:ConcurrentMap<String/* Broker Name /, HashMap<Long/ brokerId /, String/ address */>>删除其中不在topic路由table中的(TopicRouteTable)
-
发送心跳给所有的broker
-
用可重入锁lockHeartbeat,进行串行控制
-
发送心跳给所有的broker
-
准备心跳数据
-
clientid赋值
-
构建 Set
- 遍历producerTable获取生产组
-
构建Set
-
遍历consumerTable
- 消费组
- 消费者类型,pull or push
- 消息模式, 集群 or 广播
- ConsumeFromWhere
- 订阅数据Set
- UnitMode
-
-
-
遍历brokerAddrTable调用mQClientAPIImpl.sendHearbeat给所有的broker发送心跳
-
维护brokerVersionTable, 保存每个broker的版本号
-
如果只有生产者,则只给主broker发心跳
-
-
uploadFilterClassSource
-
-
-
5s执行一次offset的持久化
-
调用客户端工厂的persistAllConsumerOffset,对所有的offset进行持久化
-
遍历consumerTable,对所有的消费者调用persistConsumerOffset
-
检查消费者是否启动
-
从rebalanceImpl获取当前正在订阅的消息队列
-
调用offsetstore对象的persistAll,对所有正订阅的消息队列的offset进行持久化
-
广播模式
- 将ConcurrentMap<MessageQueue, AtomicLong> offsetTable 对象导成json串,写到本地文件中
-
集群模式
-
调用updateConsumeOffsetToBroker将offset保存到broker
- 调用mQClientFactory.findBrokerAddressInAdmin,找一个broker,主从都可以
- 调用getMQClientAPIImpl().updateConsumerOffsetOneway
-
维护offsetTable,将现在不在订阅的消息队列移除
-
-
-
-
-
-
一分钟执行一次 线程池的调整
-
-
启动拉消息service,pullMessageService
-
new一个线程,并启动,在该线程中运行,pullMessageService的run方法
-
只要没有stop则做loop
-
从pullRequestQueue take PullRequest
-
拉取消息
-
通过pullRequest中的消费组,获取到相应的消费者
-
对相应的消费者调用拉消息方法(核心逻辑)
-
从pullRequest中获取处理队列,如果处理队列dropped则返回
-
为processQueue设置拉取时间戳,标记上一次进行拉取的时间
-
判断消费是否启动
-
判断消费者是否pause
-
进行流控
-
如果缓存的消息体数大于阈值,则将该pullRequest延迟50毫秒在投入pullRequestQueue,返回
-
如果缓存的消息的大小大于阈值,则将该pullRequest延迟50毫秒在投入pullRequestQueue,返回
-
如果不是顺序模式
- 如果缓存的消息的offset的跨度超过consumeConcurrentlyMaxSpan,默认是2000,将该pullRequest延迟50毫秒在投入pullRequestQueue,返回
-
顺序模式有一个特殊处理
-
pq被锁定
-
pullRequest不是第一次被锁定
- pullRequest.setNextOffset为store中的offset
- 置位第一次被锁定
-
-
pq没有被锁定,则延迟3s在执行pullMessage,rebance时就会lock pq
-
-
-
如果rebalanceImpl.getSubscriptionInner()没有对应的SubscriptionData,则将该pullRequest延迟50毫秒在投入pullRequestQueue,返回
-
构建拉消息回调内部类
-
调用pullAPIWrapper.pullKernelImpl
-
调用findBrokerAddressInSubscribe获取一个broker的地址,可以是主也可以是从
-
如果是sql过滤方式,且broker的版本号低于V4_1_0_SNAPSHOT,则超出异常
-
如果是从 从broker拉取消息,则不在提交offset
-
如果是hasClassFilterFlag,则做一个特殊处理
-
调用mQClientFactory.getMQClientAPIImpl().pullMessage进行消息的拉取
-
构建拉消息的RemotingCommand
-
pullMessageAsync
-
当结果返回时异步回调
-
获取response
-
如果不为空
-
调用MQClientAPIImpl.this.processPullResponse将response解析成pullResult,之后回调pullCallback.onSuccess
-
调用DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult,对相应进行处理
-
维护pullFromWhichNodeTable
-
在客户端进行消息过滤
- 按tag过滤
- 过滤hook进行过滤
-
-
有找到消息
-
维护ConsumerStatsManager的incPullRT
-
给pullRequest的nextOffset赋值为broker返回的NextBeginOffset
-
如果拉回的消息为空则立马将pullRequest投入队列
-
如果拉回的消息不为空
-
维护ConsumerStatsManager().incPullTPS
-
将拉回到消息放置到processQueue的msgTreeMap中
- 如果consuming没有置位,则将其置位
-
调用consumeMessageService.submitConsumeRequest进行消息的消费
-
顺序消费的话ConsumeMessageOrderlyService
-
如果当前的processQueue的不为空,且不正在消费则,构建ComsumeRequest,提交给线程池
-
ConsumeRequest的run方法内容
-
如果processQueue已经drop则返回
-
获取这个消息队列对应的synchronized锁
-
如果是广播模式或者pq被锁定,且没有锁定超时,则不断从pq中取出消息进行消费,直到pq中的消息为空
-
如果pq droped则返回
-
如果是集群模式且pq没有锁定,10ms后尝试锁定的和消费,返回
-
如果是集群模式且pq锁定超时,10ms后尝试锁定的和消费,返回
-
如果消费超时(60s),则10ms后尝试锁定的和消费,返回
-
获取getConsumeMessageBatchMaxSize, 默认为1
-
从pq中取出getConsumeMessageBatchMaxSize个消息
- msgTreeMap中的相关消息转移到consumingMsgOrderlyTreeMap中
-
如果取出的消息不为空,则进行消费
-
构建ConsumeMessageContext,执行hook方法,executeHookBefore
-
获取可重入锁 pq.getLockConsume
-
回调消息监听器进行消费messageListener
-
解锁可重入锁 pq.getLockConsume
-
构建ConsumeReturnType
-
如果listener返回的status=null
- 如果有异常,则为ConsumeReturnType.EXCEPTION
- 否则为ConsumeReturnType.RETURNNULL
-
如果消费超时则为ConsumeReturnType.TIME_OUT
-
如果ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status,则为ConsumeReturnType.FAILED
-
如果ConsumeOrderlyStatus.SUCCESS == status则为ConsumeReturnType.SUCCESS
-
-
将ConsumeReturnType和status赋值进ConsumeMessageContext
-
执行hook方法,executeHookAfter
-
维护ConsumeMessageOrderlyService.this.getConsumerStatsManager()
-
-
-
-
-
-
-
-
-
-
-
-
-
-
.incConsumeRT
– 处理消费结果- 若listener将ConsumeOrderlyContext设置为自动提交 - status为SUCCESS - pq提交 - 清空consumingMsgOrderlyTreeMap - 维护msgCount和msgSize - 返回当前的offset - 维护getConsumerStatsManager().incConsumeOKTPS - status为SUSPEND_CURRENT_QUEUE_A_MOMENT - 维护getConsumerStatsManager().incConsumeFailedTPS - 顺序模式如果不设置最大重试次数,则会无限在客户端重试,否则如果超过最大重试次数会在服务端重试,如果服务端重试失败,则在客户端重试 - 如果在服务端成功则执行pq commit - 如果是客户端重试 - 将消息从consumingMsgOrderlyTreeMap再次转移到msgTreeMap - 之后等待context.getSuspendCurrentQueueTimeMillis,再次提交ComsumeRequest,不再继续消费 - 若listener将ConsumeOrderlyContext设置为不自动提交 - SUCCESS - 维护getConsumerStatsManager().incConsumeOKTPS - SUSPEND_CURRENT_QUEUE_A_MOMENT - 维护this.getConsumerStatsManager().incConsumeFailedTPS - 顺序模式如果不设置最大重试次数,则会无限在客户端重试,否则如果超过最大重试次数会在服务端重试,如果服务端重试失败,则在客户端重试 - 如果是客户端重试 - 将消息从consumingMsgOrderlyTreeMap再次转移到msgTreeMap - 之后等待context.getSuspendCurrentQueueTimeMillis,再次提交ComsumeRequest,再消费该条消息,退出该runnable - 维护offsetStore的offsetTable - 消费成功或者消费失败但服务端重试成功则继续消费,否则不再消费 - 如果取出的消息为空,则将pq的consuming = false;返回 - 否则 - 判断是否pq,dropped,如果是则返回 - 延迟100毫秒,再尝试锁定和消费(线程名称前缀为ConsumeMessageScheduledThread_) - 延时任务的run方法的内容 - 锁定队列 - 如果ConsumeMessageOrderlyService没有stop则执行锁定,getRebalanceImpl().lock(mq) - 获取主broker的地址 - 在主broker上执行锁定该消费队列 - 设置pq的locked和lastLockTimestamp - processQueue.setLocked(true); - processQueue.setLastLockTimestamp(System.currentTimeMillis()); - 锁定成功 - 延迟10ms在提交ConsumeRequest - 锁定失败 - 延迟3s中提交ConsumeRequest - ConsumeMessageConcurrentlyService - 用consumeBatchSize去分隔消息数组,构建消费请求对象ComsumeRequest - 将ComsumeRequest提交给线程池(consumeExecutor)执行 - 如果processQueue已经dropped则返回 - 将消息的topic设置成重试topic - 构建ConsumeMessageContext,调用消费消息钩子ConsumeMessageHook的executeHookBefore - 遍历所有的消息,设置ConsumeStartTimeStamp属性 - 回调listener的consumeMessage - 构建ConsumeReturnType枚举 - 如果调用listener异常则为EXCEPTION - 如果listener返回null,则为RETURNNULL - 如果消费时间超过getConsumeTimeout,则为TIME_OUT,默认为15分钟 - 返回RECONSUME_LATER则为FAILED - 返回CONSUME_SUCCESS则为SUCCESS - 保存消费结果到ConsumeMessageContext,调用ConsumeMessageHook的executeHookAfter - 维护ConsumerStatsManager() 复制代码
-
-
-
-
.incConsumeRT
– 如果processQueue没有dropped,则处理消费结果- 计算ackIndex的值,这个值表示的是从index + 1开始的消费都消费失败 - 如果返回的是RECONSUME_LATER, 则ackIndex = -1,表示所有的消费都消费失败 - 如果返回的是CONSUME_SUCCESS,则从ConsumeConcurrentlyContext取出该值, 如果用户未设置该值,其初始值是Integer.MAX_VALUE,表示所有的消息都消费成功 - 根据消费结果维护统计信息 - 消费成功维护getConsumerStatsManager().incConsumeOKTPS和getConsumerStatsManager().incConsumeFailedTPS - 消费失败维护getConsumerStatsManager().incConsumeFailedTPS - 对消费失败的消息进行处理 - 广播模式 - 只打印日志,不会再重试 - 集群模式 - 将消费失败的消息发送会服务端 - 收集发送失败的消息,将这些消息的消费次数 + 1,之后延迟5s中,在投入给consumeExecutor去消费 - 从processQueue中删除这些消息,但是不包括在客户端重试的消息 - 维护消费者的offsetStore中的offset信息 - offset表示当前应该从这个偏移量从broker拉取消息,offset之前的消息都已经消费成功 - 将pullRequest延后PullInterval()放入队列 - 没有匹配的消息or 没有新的消息 - 更新pullRequest的nextoffset = pullResult.getNextBeginOffset - 将pullRequest投入队列 - 如果ProcessQueue用于存储未被消费的消息的msgTreeMap为空,则将offsetstore对象的offset更新为pullResult.getNextBeginOffset - 错误的offset - 将processQueue drop表明当前消费者不在消费这个队列 - 之后在10s后执行 - 将该队列的offset持久化为pullResult中的NextBeginOffset - 删除processQueueTable中对应的消息队列 - 之后会引起在平衡 - 未空则进行异常处理 - 延迟3s在将该pullRequest投入队列 复制代码
-
-
-
启动在平衡service,rebalanceService
-
new一个线程,并启动,在该线程中运行,rebalanceService的run方法
-
只要没有stop则做loop
-
等待20s
-
执行客户端工厂的doRebalance方法
-
遍历consumerTable,对所有的消费者调用doRebalance
-
如果pause没有被置位,则调用rebalanceImpl.doRebalance
-
遍历subscriptionInner,获取该消费者订阅的所有topic,以topic为维度进行再平衡,rebalanceByTopic
-
rebalanceByTopic
-
对消息队列进行分配
-
广播模式
- 无需分配,topicSubscribeInfoTable该topic对应的所有的队列,即为分配到的所有队列
-
集群模式
- 从topicSubscribeInfoTable该topic对应的所有的队列
- 调用客户端工厂的findConsumerIdList获取这个topic,该消费组所有的消费者id
- 调用消息队列分配策略(AllocateMessageQueueStrategy)对消息队列进行分配
-
-
对处理队列table 进行更新 updateProcessQueueTable
-
删除已经不再订阅,或者已经拉取超时的消息队列
-
获取offsetstore对象将当前的offset保存
-
维护offsetstore对象中offsetTable,删除对应的消息队列
-
集群模式且是顺序模式有个特殊处理
-
如果在一秒钟之内可以拿到pq.getLockConsume(),返回true,可以删除消息队列
-
以oneWay的方式到broker去解除锁定
- 如果当前pq有消息则延迟20,解决锁定
- 没有消息则立马解决锁定
-
-
如果拿不到,则返回false,不可以删除消息队列
-
-
从processQueueTable中删除消息队列
-
-
处理新订阅的消息队列
-
顺序模式有个特殊处理
-
对mq进行锁定
- 服务端锁定
- processQueue.setLocked(true);
-
-
-
-
-
-
-
processQueue.setLastLockTimestamp(System.currentTimeMillis());
- 锁定失败则略过该消息队列 - 对新订阅的消息队列计算offset,并维护offsetStore中的offsetTable - 获取消费者中配置的ConsumeFromWhere枚举 - 获取消费者的OffsetStore对象 - 根据所配置的ConsumeFromWhere计算offset - CONSUME_FROM_LAST_OFFSET的计算 - 调用offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE) - 广播模式 - 从文件中获取offset - 维护offsetStore中的offsetTable:ConcurrentMap<MessageQueue, AtomicLong> - 集群模式 - 从Broker获取这个消息队列的offset - 维护offsetStore中的offsetTable:ConcurrentMap<MessageQueue, AtomicLong> - 如果没有offset - 如果是重试队列的话,则offset = 0 - 否则调用mQClientFactory.getMQAdminImpl().maxOffset(mq) - CONSUME_FROM_FIRST_OFFSET的计算 - CONSUME_FROM_TIMESTAMP的计算 - 增加新订阅的消息队列 - 对新增的消息队列构建pullRequest - 遍历pullRequest,依次执行pull消息 - 将pullRequest投入到客户端工厂的PullMessageService的pullRequestQueue中 - 如果在平衡后分配的消息队列和之前有分配的有变更,否则对各种信息进行维护,并发送心跳 复制代码
-
-
-
-
-
启动生产组是CLIENT_INNER_PRODUCER的DefaultMQProducer,该调用不会启动客户端工厂
-
-
-
处理traceDispatcher
shutdown
-
this.defaultMQProducerImpl.shutdown();
-
DefaultMQProducerImpl的状态是ServiceState的RUNNING才进行处理
-
在MQ客户端工厂中取消注册
- this.producerTable.remove(group);
- 告知所有的broker,该生产者取消注册了
-
关闭异步发送消息的线程池
-
关闭MQ客户端工厂
-
判断当前消费者是否关闭了之后,是否可以关闭客户端工厂
- !this.consumerTable.isEmpty() 不关闭
- !this.adminExtTable.isEmpty()不关闭
- this.producerTable.size() > 1不关闭
-
关闭MQ客户端工厂
-
客户端工厂的状态是RUNNING才关闭
-
关闭默认的生产者,但是不关闭客户端工厂this.defaultMQProducer.getDefaultMQProducerImpl().shutdown(false);
-
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
-
关闭拉取消息service,pullMessageService
-
关闭父类ServiceThread的关闭接口
- start设为false
- stop设置true
- hasNotified设置true
- countdownlatch countdown
- 打断pullMessageService线程
- 如果不是守护线程,当前线程等待pullMessageService线程结束
-
关闭定时任务执行器
-
-
关闭定时任务线程池
-
关闭再平衡service,this.rebalanceService.shutdown();
-
this.mQClientAPIImpl.shutdown();
- 关闭所有的channel
- 清空channelTables
- 关闭eventLoopGroupWorker
- nettyEventExecutor != null,关闭nettyEventExecutor
- 关闭defaultEventExecutorGroup
- 关闭publicExecutor
-
UDP socket如果不为空,则关闭udp socket
-
从工厂管理器哪里删除该工厂,MQClientManager.getInstance().removeClientFactory(this.clientId);
-
-
-
-
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
-
-
-
如果追踪消息,则traceDispatcher.shutdown();
SendResult send(
Message msg)
复制代码
-
消息校验
-
消息非空检验
-
topic校验
- topic非空检验
- topic a-zA-Z0-9_- 必须是这些字符
- topic长度校验 长度不可以超过255
- topic 不可为TBW102
-
body非空校验
-
bod大小检验,不得超过4m
-
-
defaultMQProducerImpl.send(msg)
-
send(Message msg, long timeout)
- sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout)- 检验发送者实现是否已经启动 - 寻找topic发布信息TopicPublishInfo: tryToFindTopicPublishInfo - 看topicPublishInfoTable中是否存在对应的TopicPublishInfo,有且有效则返回 - 否则,从ns更新topic路由信息,通过调用客户端工厂的updateTopicRouteInfoFromNameServer(topic) - 用可重入锁控制,保证该操作是串行的 - 获取TopicRouteData,通过调用mQClientAPIImpl.getTopicRouteInfoFromNameServer - 如果有改变则返回true - 维护topicRouteTable:ConcurrentMap<String/* Topic */, TopicRouteData> - 维护brokerAddrTable:ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> - 将TopicRouteData转化为TopicPublishInfo - 维护每一个DefaultMQProducerImpl的topicPublishInfoTable:ConcurrentMap<String/* topic */, TopicPublishInfo> - 将TopicRouteData转化为Set<MessageQueue> - 维护每一个DefaultMQPushConsumerImpl或者DefaultMQPullConsumerImpl的this.rebalanceImpl.topicSubscribeInfoTable:ConcurrentMap<String/* topic */, Set<MessageQueue>> - 没改变则返回false - 如果上异步获取到有效的TopicPublishInfo则返回 - 否则调用updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer) - 获取TBW102的TopicPublishInfo - 选择一个消息队列 - 发送消息 sendKernelImpl - 找一个主broker的地址 - 如果缓存中没有,则调用tryToFindTopicPublishInfo()获取一次 - VIP渠道有一个特殊处理 - 如果消息不是MessageBatch,则给消息的设置uniid - id=ip+pid + 类加载器的hashcode + 时间戳 + 一个递增的数值 - 如果namespace不为null,则把msg.InstanceId赋值为namespace - 压缩消息 - 如果是MessageBatch,不压缩 - 如果消息超过his.defaultMQProducer.getCompressMsgBodyOverHowmuch()则压缩,默认为4k - 用java.util.zip.Deflater进行压缩 - 如果有压缩则设置压缩flag位 - 如果消息有 TRAN_MSG这个property,且值为true,则表示是事务消息,则设置事务flag位 - 如果有CheckForbiddenHook,则遍历CheckForbiddenHook list,执行checkForbidden - 如果有发送消息hook, 遍历hook,执行sendMessageBefore - 构建SendMessageRequestHeader - 设置其他的属性 - 将message的除body的属性设置进SendMessageRequestHeader - 如果发送的是重试消息 - setReconsumeTimes - setMaxReconsumeTimes - 根据通信模式进行消息的发送 - 异步方式的发送 - 超时判断 - mQClientFactory.getMQClientAPIImpl().sendMessage( 复制代码
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final SendMessageContext context,
final DefaultMQProducerImpl producer
)
- 构建RemotingCommand - 如果是sendSmartMsg (默认为true)或者 是批量消息,则用SendMessageRequestHeaderV2替换与原来的SendMessageRequestHeader,来构建RemotingCommand - 否则用SendMessageRequestHeader来构建RemotingCommand - 将message的body赋值到RemotingCommand里面 - 根据通信模式进行消费的发送 - oneway调用 - this.remotingClient.invokeOneway - 获取或者创建渠道 - 执行钩子doBeforeRequest - invokeOnewayImpl - 将请求的报文的flag的oneway位置位 - 在超时时间里获取oneway信号量 - 调用异步写,并加入监听器,写操作完成的时候会回调channel.writeAndFlush(request).addListener - 监听器的内容 - 释放信号量 - 如果写不成功,打印日志 - 如果在超时内没有获取到信号量,则抛出异常 - 超时间为=0 - 则说明超时时间设的太短了,抛出RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast") - 超时间大于0 - 则抛出RemotingTimeoutException并且附上目前oneway信号量的信息 - 返回null - 异步调用 - 超时判断 - 调用sendMessageAsync - new一个InvokeCallback,然后remotingClient 调用invokeAsync - 获取或者创建连接 - 获取不到有效连接则关闭连接,抛出异常 - 调用钩子 - 如果超时则抛出异常 - invokeAsyncImpl - 在超时间内获取异步调用的信号量,默认是65535,也就是同一个时间最多允许65535个异步调用 - 如果在超时内没有获取到信号量,则抛出异常 - 超时间为=0 - 则说明超时时间设的太短了,抛出RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast") - 超时间大于0 - 则抛出RemotingTimeoutException并且附上目前异步信号量的信息 - 获取到信号量执行异步调用 - 超时处理 - 如果超时则,释放信号量,并抛出异常RemotingTimeoutException("invokeAsyncImpl call timeout"); - 构建ResponseFuture - channel - opaque - timeoutMillis - invokeCallback - SemaphoreReleaseOnlyOnce - put进responseTable - this.responseTable.put(opaque, responseFuture); - 调用异步写,并加入监听器,写操作完成的时候会回调channel.writeAndFlush(request).addListener - 监听器的内容 - 如果写成功则设置发送请求成功responseFuture.setSendRequestOK(true); - 失败,进行失败处理 - 从responseTable移除 - responseFuture 设置发送请求失败responseFuture.setSendRequestOK(false); - 设置响应报文为空 - 执行回调 - 在回调线程池中进行回调,callbackExecutor为null,则用publicExecutor - 释放信号量 - InvokeCallback的内容 - 如果sendCallback为null,且response不为null - 从响应报文解析出SendResult, MQClientAPIImpl.this.processSendResponse(brokerName, msg, response); - 将SendResult设置进SendMessageContext中 - 回调钩子方法sendMessageAfter - 维护发送者的失败策略 - 如果sendCallback不为null,且response不为null - 从响应报文解析出SendResult, MQClientAPIImpl.this.processSendResponse(brokerName, msg, response); - 将SendResult设置进SendMessageContext中 - 回调钩子方法sendMessageAfter - 回调sendCallback的onSuceess方法,sendCallback.onSuccess(sendResult); - 维护发送者的失败策略 - 如果response=null - 维护发送者的失败策略 - 进行失败原因分析 - 构建异常 - 如果写请求失败,new MQClientException("send request failed", responseFuture.getCause()); - 如果写成功,但是超时没有返回结果,new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", responseFuture.getCause()); - 否则new MQClientException("unknow reseaon", responseFuture.getCause()); - 将异常设置进SendMessageContext - 调用钩子sendMessageAfter - 回调sendCallback.onException方法sendCallback.onException(e); - 返回空 - 同步调用 - 超时处理 - this.remotingClient.invokeSync - 获取或者创建渠道channel,就是连接 - 如果地址为null,表明是要获取或创建ns的链接 - 从namesrvAddrChoosed原子引用中获取ns地址 - 如果ns地址不为null,则从channelTables中看是否存在连接,存在且ok,则返回 - 串行创建和ns的链接 - 如果ns地址不为null,则从channelTables中看是否存在连接,存在且ok,则返回 - 遍历namesrvAddrList创建和ns的链接 - 从namesrvAddrList选择一个ns作为namesrvAddrChoosed - 创建连接 - 否则则获取或创建和broker的连接 - 看ConcurrentMap<String /* addr */, ChannelWrapper> channelTables中是否存在连接 - 所以在一个java进程中到一个broker中只有一个连接 - 如果存在,且连接是active则返回连接 - 否则创建连接,返回null表示创建连接失败 - channelTables如果中存在该连接,且连接时ok的,则关掉该连接,且删除该连接 - 串行创建连接 - 获取lockChannelTables,可重入锁,并做3秒的超时 - 抢到锁 - 再次检查channelTables是否存在该连接 - 不存在则创建 - ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr)); - cw = new ChannelWrapper(channelFuture); - this.channelTables.put(addr, cw); - 存在,且ok则关闭连接->删除->创建 - 存在,但还未完成3握手,则不创建 - 在超时间内没有抢到锁 打印日志 - 如果cw为空,则返回为null - cw不为空,则等待连接完成 - 在连接超时的时间内等待连接完成,this.nettyClientConfig.getConnectTimeoutMillis()) - 如果在超时内连接完成 - 连接成功,返回channel - 连接失败,打印日志返回null - 超时时间内没有完成连接,返回null - 如果连接不active则关闭连接,并抛出异常 - 否则进行同步调用 - 执行rpc钩子的doBeforeRequest方法 - 超时处理 - 同步调用invokeSyncImpl,netty所有的io操作都是异步的,所以这个地方构建一个ResponseFuture,用一个CountDownLatch 做一个等待,如果写失败,则在回调线程里面释放countDownLatch,否则等收到写响应报文后释放countDownLatch - 构建ResponseFuture - 用opaque为key, ResponseFuture 为value,put进NettyRemotingAbstract.responseTable里面 - 异步写RemotingCommand,channel.writeAndFlush(request) - add listenser用于监听写操作完成 - 如果写操作成功则responseFuture.setSendRequestOK(true);返回,主线程继续等待,等待响应报文的来临 - 如果写失败 - esponseFuture.setSendRequestOK(false) - responseTable删除该responseFuture - responseFuture设置异常原因responseFuture.setCause(f.cause()); - responseFuture设置response=null,同时释放countdownlatch,主线程返回 - 用countdownlatch 做超时等待 - 从countdownlatch 做超时等待的超时中返回 - countdownlatch超时到 - 超时时间未到, 写失败 - 超时未到,写成功,并读取到响应报文 - 如果响应报文为空则超出异常 - 如果写成功则抛出RemotingTimeoutException, 表明是broker出现问题 - 如果写失败则抛出RemotingSendRequestException,发送请求异常 - 不为空则返回响应报文 - finally中 this.responseTable.remove(opaque); - 调用rpc钩子的doAfterResponse - 返回response - 将响应报文RemotingCommand转化为SendResult - 如果响应码,不是ResponseCode.FLUSH_DISK_TIMEOUT,ResponseCode.FLUSH_SLAVE_TIMEOUT,ResponseCode.SLAVE_NOT_AVAILABLE,ResponseCode.SUCCESS中的一种,则抛出异常MQBrokerException - 否则组装SendResult对象 - 根据响应码构建SendStatus - 从响应报文的extFields中解析出SendMessageResponseHeader对象 - SendStatus赋值 - msgId=uniqMsgId - offsetMsgId=服务端返回的offsetMsgId - queueOffset赋值 - messageQueue赋值 - transactionId赋值 - regionId赋值 - traceon赋值 - 返回SendResult对象 - 同步和oneway方式的发送 - 超时判断 - mQClientFactory.getMQClientAPIImpl().sendMessage( final String addr, final String brokerName, final Message msg, final SendMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, final SendMessageContext context, final DefaultMQProducerImpl producer 复制代码
)
- 如果有发送消息hook,则遍历hook,执行sendMessageAfter - 重试处理 - 是否换另外一个broker - 超时处理 - 异常处理 复制代码
-
异步发送void send(Message msg,SendCallback sendCallback)
- 在异步发送线程池中调用sendDefaultImpl
oneway发送void sendOneway(Message msg)
-
msg.setTopic(withNamespace(msg.getTopic()));
-
this.defaultMQProducerImpl.sendOneway(msg);
- this.sendDefaultImpl
批量发送消息SendResult send(
Collection<Message> msgs)
复制代码
-
构建MessageBatch
-
生成MessageBatch
-
校验
- 不能有延迟消息
- 不能有重试消息
- topic需要相同
- isWaitStoreMsgOK属性也需要相同
-
new MessageBatch
-
messageBatch.setTopic(first.getTopic());
-
messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK());
-
-
遍历所有消息进行消息校验 Validators.checkMessage(message, this);
-
遍历所有消息设置UNIQ_KEY属性
-
把所有的消息都进行编码然后存在message的body字段里面,这个一个消息就变成了一个普通的message,接下来就和传message一样去传这批量消息
-
-
同步发送message
发送事务消息
-
如果事务监听器没有设置,则抛出异常
-
为topic设置namespace
-
发送事务消息
-
检验消息
-
设置事务属性true
-
设置生产组属性
-
同步发送消息
-
处理发送结果
-
执行本地事务
-
如果发送成功
- 设属性__transactionId__
- 设置属性transactionId为消息的unikey
- 回调事务监听器,执行本地事务,并获取本地事务状态
-
如果发送结果为FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT,SLAVE_NOT_AVAILABLE中的一种,则本地事务状态为回滚消息LocalTransactionState.ROLLBACK_MESSAGE
-
其他的事务状态为未知LocalTransactionState.UNKNOW
-
-
发送本地事务执行结果
-
从offsetMsgId或者MsgId中解码出MessageId
-
找主broker出来
-
构建EndTransactionRequestHeader
-
设置事务id
-
设置commitLogOffset,requestHeader.setCommitLogOffset(id.getOffset());
-
根据本地事务状态设置setCommitOrRollback
-
COMMIT_MESSAGE
- requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
-
ROLLBACK_MESSAGE
- requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
-
UNKNOW
- requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
-
-
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
-
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
-
requestHeader.setMsgId(sendResult.getMsgId());
-
-
调用remotingClient.invokeOneway将EndTransactionRequestHeader发送出去
-
-
-
client发送的请求和处理的请求
client
- 取消注册,RequestCode.UNREGISTER_CLIENT
- 本地事务结束请求,RequestCode.END_TRANSACTION
- 发送消息,RequestCode.SEND_BATCH_MESSAGE,RequestCode.SEND_MESSAGE_V2,RequestCode.SEND_MESSAGE
- 给ns发送topic获取ns返回的路由信息,RequestCode.GET_ROUTEINTO_BY_TOPIC,ns返回Topic路由信息
- 给某一个broker发送消费组获取消费组下具体都有哪些机器,RequestCode.GET_CONSUMER_LIST_BY_GROUP
- 获取offset,查询消费组对某个队列的消费的offset,RequestCode.QUERY_CONSUMER_OFFSET
- 获取某个topic下的某个队列的最大offset,RequestCode.GET_MAX_OFFSET
- 拉取消息,RequestCode.PULL_MESSAGE
- 顺序模式进行队列锁定RequestCode.LOCK_BATCH_MQ
- 发送心跳RequestCode.HEART_BEAT
server
-
如果请求码是CHECK_TRANSACTION_STATE,则调用checkTransactionState
-
子主题 从响应报文中解码出CheckTransactionStateRequestHeader1
-
从响应报文中解码出MessageExt
-
从messageExt出去UNIQ_KEY属性,如果不为空串,则将其给setTransactionId赋值
-
从messageExt取出PROPERTY_PRODUCER_GROUP属性,即为生产组
-
用生产组从客户端工厂取出响应的生产者
-
之后对生产者调用checkTransactionState
-
构建一个runnable提交给checkExecutor线程池去执行
-
runnable的内容
-
获取事务监听器
-
回调事务监听器的checkLocalTransaction方法,获取事务状态
-
对事务状态进行处理
-
构建EndTransactionRequestHeader
- thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
- thisHeader.setProducerGroup(producerGroup);
- thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
- thisHeader.setFromTransactionCheck(true);
- thisHeader.setMsgId(uniqueKey);
- thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
- 根据事务状态设置setCommitOrRollback
-
如果事务check有一场抛出则记录异常信息remark = “checkLocalTransactionState Exception: ” + RemotingHelper.exceptionSimpleDesc(exception);
-
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,3000);
- 构建请求报文RemotingCommand
- this.remotingClient.invokeOneway(addr, request, timeoutMillis);
-
-
-
-
-
返回null
-
-
通知消费者id变更,请求码RequestCode.NOTIFY_CONSUMER_IDS_CHANGED
- 立即执行在平衡this.rebalanceService.wakeup();
- return null
-
重置消费者的offset,RequestCode.RESET_CONSUMER_CLIENT_OFFSET
-
从请求报文中,解析出topic, group, offsetTable(Map<MessageQueue, Long>)
-
从consumerTable获取相应的消费者
-
挂起消费者,将pause置为true
-
将要重置offset的MessageQueue先停止消费
- ProcessQueue.setDropped(true);
- ProcessQueue.clear();
-
睡10s
-
重新设置相关消费队列的offset
-
将这些offset持久化
-
删去这些offset
-
删去messageQueue,和processQueue
-
pause = false
-
在平衡
-
return null
-
-
获取客户端消费offset,RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT
- 获取ConcurrentMap<MessageQueue, AtomicLong> offsetTable
- 将offsetTable封装进RemotingCommand
- return RemotingCommand
-
获取消费者运行信息,RequestCode.GET_CONSUMER_RUNNING_INFO
-
从RemotingCommand中解码出GetConsumerRunningInfoRequestHeader,获取消费组
-
根据消费组获取相应消费者的运行信息ConsumerRunningInfo
-
从consumerTable中获取消费者MQConsumerInner
-
从消费者MQConsumerInner中获取消费者运行信息ConsumerRunningInfo
-
pull处理
-
push处理
-
new ConsumerRunningInfo
-
ConsumerRunningInfo设置 是否顺序消费
-
ConsumerRunningInfo设置消费线程池的核心线程数
-
ConsumerRunningInfo获取消费者的启动时间
-
ConsumerRunningInfo设置了消费者的订阅信息
-
ConsumerRunningInfo设置队列消费情况,当前的offset
-
ConsumerRunningInfo设置该消费者的每个topic的消费信息
- 从统计模块中获取该信息ConsumerStatsManager
-
-
-
获取ns地址列表
-
ConsumerRunningInfo设置ns地址列表
-
ConsumerRunningInfo设置消费类型 pull or push
-
ConsumerRunningInfo获取客户端版本
-
-
如果ConsumerRunningInfo不为null, 则判断是否要jstack信息,如果要则在响应报文上,设置jstack
-
调用Thread的static方法getAllStackTraces获取当前所有线程的堆栈信息 Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
-
将线程堆栈信息转化为string,设置进ConsumerRunningInfo
- 遍历所有线程的堆栈信息StackTraceElement[],StackTraceElement.toString(),将所有的线程的堆栈信息合成一个字符串
-
-
如果为null,则返回ResponseCode.SYSTEM_ERROR,且附上错误信息
-
-
直接消费某个消息RequestCode.CONSUME_MESSAGE_DIRECTLY
-
从响应报文中解码出ConsumeMessageDirectlyResultRequestHeader和MessageExt
-
消费消息
-
从consumerTable获取响应消费这
-
consumer.getConsumeMessageService().consumeMessageDirectly
-
并发消费
- ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult()
- result.setOrder(false);
- result.setAutoCommit(true);
- 调用listener对消息进行消费
- 设置消费结果
- 设置消费耗时
-
顺序消费
- ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult()
- result.setOrder(true);
- 调用listener进行消费
- 设置消费结果
- 设置是否自动提交,顺序消费是可以在listener中设置是否自动提交,默认是自动提交
- 设置消费时间
-
-
-
如果消费结果为null,则返回ResponseCode.SYSTEM_ERROR
-
如果消费结果不为null,则返回ResponseCode.SUCCESS,并将消费结果写会去
-
DefaultMQPushConsumer
subscribe(String topic, String subExpression)
-
defaultMQPushConsumerImpl.subscribe
-
构建订阅数据类SubscriptionData
- 订阅表达式如果为空,则设置为*
- 如果订阅表达式不为空,则用||将订阅表达是分隔,将tag加入Set tagsSet中将tag的hashcode加入Set
-
维护rebalanceImpl的ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner
-
调用客户端工厂给所有的broker发送心跳
-
registerMessageListener
- DefaultMQPushConsumer赋值
- DefaultMQPushConsumerImpl赋值
start()
-
defaultMQPushConsumerImpl.start()
-
检查配置
-
消费组校验
- 消费组不可为空
- 消费主必须满足^[%|a-zA-Z0-9_-]+$
- 长度不可超过255
- 消费组不可为DEFAULT_CONSUMER
-
消息模式不可以为空
-
ConsumeFromWhere不可为空
-
时间戳不可为空
-
AllocateMessageQueueStrategy不可为空
-
订阅信息subscription不可为空
-
messageListener不可为空
-
messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently
-
consumeThreadMin Out of range [1, 1000]
-
consumeThreadMax Out of range [1, 1000]
-
consumeThreadMin can’t be larger than consumeThreadMax
-
consumeConcurrentlyMaxSpan Out of range [1, 65535]
-
pullThresholdForQueue Out of range [1, 65535]
-
pullThresholdForTopic Out of range [1, 6553500]
-
pullThresholdSizeForQueue Out of range [1, 1024]
-
pullThresholdSizeForTopic Out of range [1, 102400]
-
pullInterval Out of range [0, 65535]
-
consumeMessageBatchMaxSize Out of range [1, 1024]
-
pullBatchSize Out of range [1, 1024]
-
-
赋值订阅信息
-
如果defaultMQPushConsumer Map<String /* topic /, String / sub expression */> subscription不为空,则将这些数据拷贝到rebalanceImpl的subscriptionInner中
-
如果是集群模式
-
构造retryTopic
- %RETRY%+消费组
-
构造SubscriptionData
-
维护到rebalanceImpl的subscriptionInner中
-
-
-
如果是集群模式,实例名如果是DEFAULT,则将实例名改成pid
-
获取或者创建客户端工厂
-
给rebalanceImpl赋值
- 消费组
- 消息模式,广播或者集群
- 消息队列分配策略
- 客户端工厂
-
构建PullAPIWrapper
-
构建offsetStore
-
如果是集群模式
- RemoteBrokerOffsetStore
-
如果是广播模式
- LocalFileOffsetStore
-
-
offsetStore.load
-
集群模式
- 方法体为空
-
广播模式
- 从本地文件中读出ConcurrentMap<MessageQueue, AtomicLong> offsetTable并加载进offsetstore
-
-
构建ConsumeMessageService
-
如果是MessageListenerOrderly
- 构建ConsumeMessageOrderlyService
-
如果是MessageListenerConcurrently
- 构建ConsumeMessageConcurrentlyService
-
-
启动ConsumeMessageService
-
往客户端工厂中注册消费者
-
启动客户端工厂
-
更新订阅消息,当订阅信息改变
-
checkClientInBroker
-
给所有的broker发送心跳
-
立即在平衡
-
-
traceDispatcher处理