RocketMQ源码之Broker与NameServer

系列文章目录

源码分析之Broker yuanzhicun.blog.csdn.net/article/det…

Rocket源码分析之NameServer yuanzhicun.blog.csdn.net/article/det…

前言

之前的文章已经分析了NameServer的启动创建,以及Broker的创建,4大核心对象等,今天主要分析就是Broker启动后与NameServer的交互,心跳信息

其实理解Broker的源码特别简单,你要集合下面的这个图关注几个点:

1.Broker既然要跟NameServer通信,还要接收生产者消费者的请求,那么它就得有网络服务,所以会集成 NettyServer 和 NettyClient

2.你想客户端每次请求Broker过来肯定不能同步阻塞啊,所以就会有线程池来应对各种请求

3.Broker会有不同的功能,例如高可用,Dleger管理CommitLog, ConsummeQueue等一些也是需要功能的,所以会有不同的功能对应不同的组件

4.一个应用总会有一些定时任务存在,比如 定时发送心跳,定时清理,定期落盘等等需要一些定时后台线程池

带着上面的4个问题,理解这个图,并跟着我的思路去跟踪代码在这里插入图片描述

大家要看着上面的图去理解下面的代码分析就事半功倍了

一、Broker启动

之前以及研究了 Broker启动创建Controller的过程,今天研究创建后,如何启动这个Controller提供服务<font color=#999AAA >” title=”null” loading=”lazy” class=”medium-zoom-image”><strong>controller.start()</strong><img decoding=采用定时器发送注册心跳信息到NameServer在这里插入图片描述

二、向NameServer执行注册

在这里插入图片描述

2.真正执行注册的方法

在这里插入图片描述brokerOuterAPI.registerBrokerAll

    public List<RegisterBrokerResult> registerBrokerAll(        final String clusterName,        final String brokerAddr,        final String brokerName,        final long brokerId,        final String haServerAddr,        final TopicConfigSerializeWrapper topicConfigWrapper,        final List<String> filterServerList,        final boolean oneway,        final int timeoutMills,        final boolean compressed) {        //存放结果        final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();        //获取NameServer列表        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {            //封装请求头            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();            requestHeader.setBrokerAddr(brokerAddr);            requestHeader.setBrokerId(brokerId);            requestHeader.setBrokerName(brokerName);            requestHeader.setClusterName(clusterName);            requestHeader.setHaServerAddr(haServerAddr);            requestHeader.setCompressed(compressed);            //封装请求体            RegisterBrokerBody requestBody = new RegisterBrokerBody();            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);            requestBody.setFilterServerList(filterServerList);            final byte[] body = requestBody.encode(compressed);            final int bodyCrc32 = UtilAll.crc32(body);            requestHeader.setBodyCrc32(bodyCrc32);            //通过 CountDownLatch 控制主线程,等其他线程都注册完毕            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());            for (final String namesrvAddr : nameServerAddressList) {                brokerOuterExecutor.execute(new Runnable() {                    @Override                    public void run() {                        try {                            //执行注册                            RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);                            if (result != null) {                                registerBrokerResultList.add(result);                            }                            log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);                        } catch (Exception e) {                            log.warn("registerBroker Exception, {}", namesrvAddr, e);                        } finally {                            //-1                            countDownLatch.countDown();                        }                    }                });            }            try {                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);            } catch (InterruptedException e) {            }        }        return registerBrokerResultList;    }
复制代码

上面的代码就是 获取相关信息 封装 header 和 body 通过NettyClient发送给NameServer并获取响应结果。 方法再深入就不带大家看了,里面就是Netty的API的使用。其实理解到这就够了

三、 NameServer如何处理Broker的请求

在这里插入图片描述上面 remotingServer 就是在之前讲的 NameServer创建的NettyServer对象,用于监控连接的在这里插入图片描述在这里插入图片描述在这里插入图片描述基本的流程都给大家穿起来了,你需要做得就是根据我写的内容流程,自己去代码跟踪下就懂了

写在最后,感谢点赞关注收藏转发

欢迎关注我的微信公众号 【猿之村】

来聊聊Java面试 加我的微信进一步交流和学习,微信手动搜索 【codeyuanzhicunup】添加即可 如有相关技术问题欢迎留言探讨,公众号主要用于技术分享,包括常见面试题剖析、以及源码解读、微服务框架、技术热点等。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享