系列文章目录
源码分析之Broker yuanzhicun.blog.csdn.net/article/det…
Rocket源码分析之NameServer yuanzhicun.blog.csdn.net/article/det…
前言
之前的文章已经分析了NameServer的启动创建,以及Broker的创建,4大核心对象等,今天主要分析就是Broker启动后与NameServer的交互,心跳信息
其实理解Broker的源码特别简单,你要集合下面的这个图关注几个点:
1.Broker既然要跟NameServer通信,还要接收生产者消费者的请求,那么它就得有网络服务,所以会集成 NettyServer 和 NettyClient
2.你想客户端每次请求Broker过来肯定不能同步阻塞啊,所以就会有线程池来应对各种请求
3.Broker会有不同的功能,例如高可用,Dleger管理CommitLog, ConsummeQueue等一些也是需要功能的,所以会有不同的功能对应不同的组件
4.一个应用总会有一些定时任务存在,比如 定时发送心跳,定时清理,定期落盘等等需要一些定时后台线程池
带着上面的4个问题,理解这个图,并跟着我的思路去跟踪代码
大家要看着上面的图去理解下面的代码分析就事半功倍了
一、Broker启动
之前以及研究了 Broker启动创建Controller的过程,今天研究创建后,如何启动这个Controller提供服务采用定时器发送注册心跳信息到NameServer
二、向NameServer执行注册
2.真正执行注册的方法
brokerOuterAPI.registerBrokerAll
public List<RegisterBrokerResult> registerBrokerAll( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final boolean oneway, final int timeoutMills, final boolean compressed) { //存放结果 final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList(); //获取NameServer列表 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { //封装请求头 final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed); //封装请求体 RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); final byte[] body = requestBody.encode(compressed); final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32); //通过 CountDownLatch 控制主线程,等其他线程都注册完毕 final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { //执行注册 RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); if (result != null) { registerBrokerResultList.add(result); } log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}", namesrvAddr, e); } finally { //-1 countDownLatch.countDown(); } } }); } try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } } return registerBrokerResultList; }
复制代码
上面的代码就是 获取相关信息 封装 header 和 body 通过NettyClient发送给NameServer并获取响应结果。 方法再深入就不带大家看了,里面就是Netty的API的使用。其实理解到这就够了
三、 NameServer如何处理Broker的请求
上面 remotingServer 就是在之前讲的 NameServer创建的NettyServer对象,用于监控连接的
基本的流程都给大家穿起来了,你需要做得就是根据我写的内容流程,自己去代码跟踪下就懂了
写在最后,感谢点赞关注收藏转发
欢迎关注我的微信公众号 【猿之村】
来聊聊Java面试 加我的微信进一步交流和学习,微信手动搜索 【codeyuanzhicunup】添加即可 如有相关技术问题欢迎留言探讨,公众号主要用于技术分享,包括常见面试题剖析、以及源码解读、微服务框架、技术热点等。