前言
在服务发现中,我们知道Broker在启动时会开启定时任务向每个NameServer注册自己的路由信息,而Producer启动时也会根据Topic向NameServer拉取更新Topic对应的路由信息到本地,这个路由信息从上篇文章我们知道就是TopicRouteData,里面包含了BrokerDatas和QueueDatas。当Producer要发送某一消息时,很显然就会先从本地获取Topic对应的路由,那如果本地找不到会怎样呢?如果找到了TopicRouteData,一个topic对应多个QueueData,每个Queue记录了自己所属的Broker,那么Topic是怎么选出对应的路由信息,又是采用什么样的负载均衡的策略选出某个Queue的呢?本文和你一探究竟。
探索消息发送
我们使用RocketMq发送消息的代码如下:
DefaultMQProducer producer = new DefaultMQProducer(topic);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg = new Message(topic,"testTag","testKey","testMessage".getBytes());
SendResult result = producer.send(msg);
复制代码
发送消息就是从DefaultMQProducer.send()开始的
1、看到先是校验了消息的合法性,还是使用的defaultMQProducerImpl的send()
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
Validators.checkMessage(msg, this);
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg);
}
复制代码
2、真正的消息发送就在DefaultMqProducerImpl.sendDefaultImpl()方法
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException{
...前面都是一些检查
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
...后面是一些异常处理
}
复制代码
3、我们看到首先调用的tryToFindTopicPublishInfo(topic),根据消息的topic获取topic的路由信息。看看这个方法里干了啥
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
复制代码
- 这个方法先从topicPublishInfoTable里拿TopicPublishInfo,如果没有的话调用mQClientFactory.updateTopicRouteInfoFromNameServer(topic)从远程nameServer获取topic的路由信息,这在服务发现一文中已经分析过了。然后再把远程拿到的放进topicPublishInfoTable中。
- 精髓来了,不是说nameServer一定有topic路由信息,如果你没有配置topic,而且消息也是第一次发送的话,nameServer也没有该topic的路由。这里就执行的是this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);再看看这个熟悉的方法updateTopicRouteInfoFromNameServer(topic, isDefault, defaultMQProducer)
因此此时isDefault=true,而且defaultMQProducer!=null,所以就走到下面这个分支,调用getDefaultTopicRouteInfoFromNameServer获取默认topic的路由信息。
if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
}
复制代码
这个createTopicKey我们看到是指定的一个Topic,叫做“TBW102”,它会在broker允许自动创建topic的时候自动创建。
到这里我们就明白了,根据topic获取路由信息。先从本地获取,本地获取到了就返回。否则再走一遍服务发现的逻辑,从远程NameServer获取,获取到了就返回。如果NameServer也获取不到,那就使用默认的Topic(TBW102)的路由信息作为暂时路由,并插入到本地路由表,当TopicX的消息到达Broker之后,Broker发现没有该Topic的路由,就会自动创建,然后同步到NameServer。
4、下面就是选择一个Queue进行发送消息了。在服务发现中我们知道NameServer返回的是TopicRouteData,里面由queueDatas和brokerDatas。queueData中包含了Topic对应的所有Queue信息,结构如下:
public class QueueData implements Comparable<QueueData> {
private String brokerName; //Queue所属的Broker
private int readQueueNums; //该Broker上,对于该Topic配置的读队列个数
private int writeQueueNums; //该Broker上,对于该Topic配置的写队列个数
private int perm;
private int topicSynFlag;
}
复制代码
对于RocketMq,Queue是比较抽象的一个概念,并不是说某个具体的队列。Topic、QueuData以及Broker是1:1:1的,QueueData本质上是记录了某个Topic在某个Broker上的所有路由信息。
在前面一步中,当生产者从NameServer获取到Topic对应的TopicRouteData时,会将其转换成TopicPublishInfo,存放在本地路由表中。
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
复制代码
在topicRouteData2TopicPublishInfo内部,会遍历TopicRouteData的QueueData,按照配置的读写队列的个数,生成MessageQueue,存放在TopicPublishInfo中。
List<QueueData> qds = route.getQueueDatas();
Collections.sort(qds);
for (QueueData qd : qds) {
if (PermName.isWriteable(qd.getPerm())) {
BrokerData brokerData = null;
for (BrokerData bd : route.getBrokerDatas()) {
if (bd.getBrokerName().equals(qd.getBrokerName())) {
brokerData = bd;
break;
}
}
if (null == brokerData) {
continue;
}
if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
continue;
}
for (int i = 0; i < qd.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
info.getMessageQueueList().add(mq);
}
}
}
复制代码
5、再回到发送消息的地方。只要times < timesTotal不超过重试次数,以及timeout < costTime,在超时时间范围内,都可以进行重试发送。
6、this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);就是负载均衡选择一个MessageQueue进行发送。选择Queue的逻辑就在TopicPublishInfo的selectOneMessageQueue(lastBrokerName)方法内
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
复制代码
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
复制代码
可以看出,负载均衡的策略是计数器的取模算法。在lastBrokerName为空时,对计数器自增,计数器的值对messaeQueue列表个数取模,获得对应下标的MessageQueue。在lastBrokerName不为空时,先对计数器自增,然后遍历messaeQueue列表,计数器的值对messageQueue列表个数取模获得对应的下标,如果对应下标的MessageQueue还是lastBroker,就再次循环,否则返回对应下标的Broker。
7、接下来就是消息发送的核心流程,在DefaultMqProducerImpl.sendKernelImpl()方法里
首先要获取Queue对应的Broker地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
复制代码
拿到Broker地址后,要将消息内容及其他消息封装进请求头
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
...
复制代码
接着调用MQClientAPIImpl的sendMessage()方法
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
复制代码
在sendMessage()里面就是封装请求,调用封装的Netty进行网络传输了。
首先是封装请求RemotingCommand,设置消息内容。
RemotingCommand request = null;
String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
if (isReply) {
if (sendSmartMsg) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
}
} else {
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
}
request.setBody(msg.getBody());
复制代码
然后根据发送的方式是单向、异步还是同步分别进行调用。
switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}
复制代码
可以看到单向是invokeOneway之后就不管返回null了,异步是sendMessageAsync()传入了sendCallback这个回调函数。同步就是sendMessageSync一直等待结果并返回这个结果。至此,消息的发送就结束了。
我们回顾下消息发送的整个流程
结语
我们看到消息发送的几个关键词DefaultMqProducerImpl,topicPublishInfoTable,updateTopicPublicInfoFromNameServer,默认Topic(TBW102),TopicPublishInfo.selectOneMessageQueue以及MQClientAPIImpl.sendMessage()。本文读完后,你可以解答以下问题了
- 第一步获取topic对应路由的过程是怎么的?如果从Nameserver也没有获取到怎么办?
- 当获取到TopicPublicInfo后拿到多个MessageQueue,怎么负载均衡选择一个MessageQueue进行发送的。
- 发送消息大致流程是怎样的?最后网络调用有哪三种方式?