Zookeeper源码学习

版本: 开始看的是3.4.14版本,后面对比了一下3.5.5版本

客户端管理

ZK服务端和客户端通过TCP长连接进行交互,ZK服务端开启TCP server有两种方式:

  1. 基于Netty实现的NIOServerCnxnFactory
  2. 基于Java原生NIO实现的 NIOServerCnxnFactory

需要说明的是,ServerCnxnFactory只用于客户端连接的管理,和其他server之间的交互是没有关系的,这个要搞清楚。线上用的是默认模式,也就是NIOServerCnxnFactory模式,所以接下来主要以分析NIOServerCnxnFactory模式为准

在说连接管理之前,先看看NIOServerCnxnFactory中的线程管理,即:哪些线程用于处理连接请求,哪些请求用于处理读写请求,开启多少个Selector线程等。因为默认情况下,具体的线程数和CPU核心数相关,所以我们这里假设CPU核心数为8:

  1. 连接请求:对应1个线程,即AcceptThread,该线程会持有一个独立的Selector。如果收到连接请求,会将该请求对应的SocketChannel推送到SelectorThread中的阻塞队列,接下来的逻辑又SelectorThread来处理
  2. SelectorThread:线程数为sqrt(numCores/2),所以这里对应2个SelectorThread线程,每个对应一个Selector。该线程有两个作用,一个是处理AcceptThread推送过来的连接请求,将其封装成NIOServerCnxn对象,维护到Map缓存中;另一个作用就是接收读写请求,然后将读写请求封装成IOWorkRequest对象,最后提交到ioWorkers线程池,也就是它只管接收IO请求,然后快速提交到ioWorkers线程池,具体的业务逻辑由ioWorkers线程池处理,所以这个过程也非常快
  3. ioWorkers:线程数为numCores*2,所以这里线程数为16。该线程池用于处理SelectorThread线程提交多来的IO任务,也就是具体的业务逻辑,该线程池类型默认为Executors.newFixedThreadPoolNIOServerCnxn通过SelectionKey#attachment传递
  4. ConnectionExpirerThread:线程数1,对应图中的ClearThread,用于清理已经过期的连接,这里面涉及到ExpiryQueue数据,有关于该数据结构的介绍,参考Session管理章节

image.png

上面其实主要说明了NIOServerCnxnFactory中的线程管理,那客户端的一个连接在NIOServerCnxnFactory中会被抽象成什么?上面简单提到了,会被封装成NIOServerCnxn对象

  1. NIOServerCnxn:可以看成是客户端连接的一个抽象。后面涉及到的一些客户请求处理,都和这个对象息息相关
  2. ipMap: 以客户端IP为key,NIOServerCnxns为value的hash表。一个zk客户端可以和server建立多个长连接,server端会对每个zk客户端的连接数进行限制,默认是是60
  3. cnxnsNIOServerCnxn的Set集合,主要用于4字命令

image.png

Session管理

  1. 客户端第一次IO请求的时候,涉及到NIOServerCnxn对象的初始化,即执行NIOServerCnxn#readConnectRequest方法
  2. 因为是第一次IO请求,此时客户端和server端仅仅是建立了TCP长连接,此时还没有创建Session,所以在该方法中将为该客户端创建一个SessionImpl对象,即ZooKeeperServer#createSession方法
  3. 继续执行SessionTracker#createSession方法,该方法会将创建的Session对象维护到SessionTrackerImpl中的缓存中,同时返回一个sessionId,之后客户端每次请求的时候都要带上该sessionId。在集群模式下,还需要将该Session同步到其他server节点,会更复杂,我们后面再分析
  4. 将创建的sessionId维护到NIOServerCnxn对象中
  5. NIOServerCnxn对象已初始化,然后收到IO请求,则会执行NIOServerCnxn#readRequest方法,进而调用ZooKeeperServer#processPacket => ZooKeeperServer#submitRequest方法。该方法将先调用ZooKeeperServer#touch方法,将SessionTrackerImpl中的session活跃时间更新,如果session或已过期,则会报错;然后将请求交给ZooKeeperServer业务逻辑链进行处理

那么SessionTrackerImpl中的Sessions是如何维护的?它怎么知道哪个Session到期了哪个没到期?

  1. sessionsById:数据结构为key为sessionId, value为SessionImpl的Map
  2. sessionExpiryQueueExpiryQueue<SessionImpl>。有关于ExpiryQueue的数据结构,见下面的图,SessionTrackerImpl即对应下图中的ClearThread,在它的run方法中会不停的清理sessionExpiryQueue中已过期的元素

image.png

  1. ExpiryQueue底层的存储结构为key时间戳,value为Set的Map,需要注意的时候,这里的时间戳可以被tickTime整除,两个key之间至少相隔一个tickTime
  2. 每个格子的长度为tickTimetickTime在ZK中默认为2000
  3. 上面已经说错了,客户端每次请求(包括心跳)过来的时候,server端都需要更新该客户对应的Session的活跃时间,其实就是更新该Session在ExpiryQueue中的格子位置,这将触发ExpiryQueue#update方法
  4. 假如客户端配置的sessionTimeOut为10,当前时间为1618647260000,在执行ExpiryQueue#update方法的时候,首先以((now + sessionTimeOut)/tickTime + 1) * tickTime计算出给session的格子位置,即((1618647260000+10000)/2000 + 1)/2000 = 16186472612000,该公式可以保证计算出来的结果可以被tickTime整除
  5. 根据步骤4的公式可以得出,如果这是一个刚刚创建的Session,那么该Session将会被放到16186472612000格子对应的Set集合中;如果这是一个在ExpiryQueue中已经存在的Session,则会先将该Session移动到16186472612000格子对应的Set集合中,然后从老的Set集合中删除
  6. 上面只说了Session的更新,还有些情况没考虑。假如客户端连接断开了,那么该Sessioin会一直在该Map中不会被清除,这样将导致内存泄漏了,最后可能会导致Server端OOM。除此之外,ZK中的sessionTimeOut是一个非常重要的概念,比如客户端在sessionTimeOut之后又重连上来了,这时候该Session其实是不可用的。那说明还需要有一种失效Session清理机制
  7. 清理逻辑其实很简单,开一个独立的线程,一直以当前时间计算出格子,然后将该格子对应的Set集合为全部清除。根据上面的介绍我们明确的知道,如果一个Session是有效的,它会一直向后移动,当前时间格子对应的Set集合,代表是可以被清除的Sessions

业务逻辑链

上面只是介绍了客户端的连接管理,还没有看到客户端提交的业务请求流程,比如创建一个节点,不过我们大概也可以猜测入口在ioWorker中。而有关于server端的处理逻辑,其实是通过责任链的形式将各个RequestProcessor组织起来的,比如:写事务日志更新DataTree缓存等,在ZK中对应着不同的RequestProcessor。根据zkServer角色的不同,对应的责任链也不一样。

比如在单机模式下,在ZooKeeperServer#startup方法中,会执行RequestProcessor链路初始化的逻辑,如下:

RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
复制代码

下图是各个不同状态的zkServer对应的RequestProcessors
image.png

数据存储

  1. 有关于ZK的数据存储,都和ZKDatabase相关,可以看成是ZK操作数据的门面类,里面包括了ZK以下几种类型的数据:DataTreeFileTxnSnapLogminCommittedLogmaxCommittedLogcommittedLog
  2. DataTree:ZK节点在内存中的表现形式,包括节点路径和节点的Map映射dataWatcheschildWatchesephemerals
  3. 快照文件:快照文件对应ZK截至到某一个时刻的全量数据,文件名为snapshot.{事务ID},其中文件名上的事务ID表示这个文件里面最大的事务ID。每执行50000~100000个事务后,会创建一个snapshot文件。可以根据ZK的文件清理策略对其进行清理
  4. 事务日志文件:对应ZK的写前日志,文件格式为log.{事务ID},其中文件名上的事务ID表示这个文件里面最小的事务ID
  5. 每提交一个事务后,会将该数据提交到DataTree缓存中都i
  6. 每次ZKServer启动之前,都会将快照日志和事务日志加载到DataTree。快照文件的加载顺序是:对所有快照按照事务ID从大到小进行排序,取出其中的前100个,然后依进行加载,如果第一个文件加载成功就直接返回,否则加载第二个,以此类推。快照文件加载之后,即可以得到现阶段的一个最新事务ID,然后根据该事务ID去查找需要加载事务日志文件,如果找到事务日志文件,则会对事务日志进行一一回放,至此有关于数据加载已经完成。有关于根据事务ID找事务文件时涉及到的一些细节,可以参考文件清理章节

Watch机制

  1. watch对应两部分:watch注册watch触发。可以通过ZK提供的几个固定API供进行watch注册,注册的watch会在客户端保存,同时在服务端被标记;watch触发就是说当对应的节点发生变更时,服务端会向客户端派发对应的节点变更事件
  2. 在原生ZK中,提供了以下几个API供我们注册Watch,分别为Zookeeper构造函数Zookeeper#existsZookeeper#getDataZookeeper#getChildrenZookeeper#removeWatches。我们平时使用zkClient的各种subscribexx等方法,都是经过封装过的
  3. 对应的节点变更事件有:创建节点事件删除节点事件更改节点事件子节点列表变化事件对父节点的变更以及孙节点的变更都不会触发watcher,而对watcher本身节点以及子节点的变更会触发watcher

Watch注册

下面我们以Zookeeper#getData为例,对应的目标节点是/zk/test,讲解watch注册的整个流程

  1. 执行Zookeeper#getData方法,假设这里我们传入了一个自己写的Watcher对象
  2. 将Watcher对象封装成DataWatchRegistration对象,将请求体封装成GetDataRequest对象,然后将该消息推送到异步发送请求线程的阻塞队列中
  3. 在该请求返回前,客户端线程会一直阻塞
  4. 客户端收到返回结果后,执行DataWatchRegistration#register方法,该方法主要用于向Zookeeper.ZKWatchManager.dataWatches缓存中添加我们传入的那个Watcher

那么服务端收到watch注册请求会做什么呢?

  1. 服务端收到客户端请求,请求最终会进入FinalRequestProcessor#processRequest方法,然后执行ZKDatabase#getData => DataTree#getData 方法
  2. 先从DataTree中获取节点数据,然后将代表当前客户连接的NIOServerCnxn当作一个Watcher添加到DataTree.dataWatches缓存中,NIOServerCnxn实现了Watcher接口
  3. 我们可以先看看NIOServerCnxn#process方法的逻辑,其实就是创建一个WatcherEvent对象,然后发送给客户端

image.png

Watch触发

下面我们以Zookeeper#setData为例,对应的目标节点是/zk/test,讲解watch触发的整个流程

  1. 以创建节点为例,最终执行到FinalRequestProcessor#processRequest方法,然后执行ZooKeeperServer#processTxn => ZKDatabase#processTxn => DataTree#processTxn 方法
  2. 执行DataTree#setData方法,做一些前置校验,然后给节点赋值
  3. 触发dataWatches.triggerWatch方法,即触发watch,对应的是WatchManager#triggerWatch方法
  4. WatchManager#triggerWatch方法中,先根据path找到watchers,然后将这些watchers从缓存中删除,然后执行Watcher#process方法,和上面对应起来,这里会执行NIOServerCnxn#process方法,即向客户端发送一个WatcherEvent事件

那么客户端收到服务端发送过来的WatcherEvent事件会怎么出列呢?

  1. 在客户端的SendThread#readResponse方法中,解析服务端返回的WatcherEvent事件,然后将该WatcherEvent推送到EventThread.waitingEvents阻塞队列中
  2. EventThread线程不断从EventThread.waitingEvents阻塞队列中拿消息吗,然后执行根据服务端返回的WatcherEvent找到客户端之前注册的Watcher对象,同时将该Watcher从客户端移除,然后执行EventThread#processEvent方法
  3. EventThread#processEvent方法中,会触发客户端的Watcher#process方法,支持,整个流程结束

image.png

单机启动流程

有了上面几个章节的基础,现在简单串起来看看ZK单机模式下的启动流程
image.png

image.png

  1. 从配置文件中加载配置项
  2. 启动文件清理的定时任务
  3. 启动AdminServer
  4. 启动NIOServerCnxnFactory,启动相关的线程,这时候就可以接收客户端连接了
  5. 创建ZkDatabase,并从快照文件和事务日志文件中加载数据到内存
  6. 启动ZooKeeperServer,在这个期间,会启动SessionTracker,用于Session的管理
  7. 初始化RequestProcessor责任链

image.png

单机事务请求

  1. 整体流程参考下图,还有很多细节有待研究
  2. 有关于ZooKeeperServer.outstandingChanges,需要知道的是,只有写请求的时候才会向这个阻塞队列里面推消息。关于它的作用,目前根据它用到的地方,我感觉是和zk的OpCode.multi指令有关:当我们在一次请求中涉及到多个命令,比如先改 -> 再查 -> 再改,当第一次修改之后,写了事务日志,但因为还没提交,所以再内存中看到的还是老数据,通过ZooKeeperServer.outstandingChanges,就可以先将修改后的值写到这个阻塞队列和其独赢的缓存,然后查询的时候就可以看到当前请求修改的数据了。能力有限,有待研究
  3. FinalRequestProcessor#processRequest中,会先执行ZKDatabase#processTxn方法,即内存DataTree缓存。然后更新ZKDatabase.committedLog队列,这个队列主要用于数据同步,基于这个确定数据同步方式

image.png

集群启动流程

  1. 加载配置项及前置校验
  2. 加载数据:从快照文件和事务日志文件中加载数据到内存
  3. 启动NIOServerCnxnFactory,启动相关的线程,这时候就可以接收客户端连接了
  4. 启动AdminServer
  5. 初始化选主用的相关资源,主要包括:QuorumCnxManager.Listener#startFastLeaderElection#start。这里面一共涉及到5个线程,开启了几个server之间选主长连接的端口
  6. 选主: 执行QuorumPeer#run => FastLeaderElection#lookForLeader,这个动作做完之后,将触发各个server之间暗建立选主专用的长连接,让步骤5中的几个线程活动起来了
  7. 步骤6选主完成之后,已经确定了准leader,各个server的状态创建对应的ZookeeperServer,因为根据Server状态的不同,他们的职责是不一样的,所以要区分开来,主要有以下状态:
    1. 如果leader: 创建LeaderLeaderZooKeeperServer
    2. 如果follower: 创建FollowerFollowerZooKeeperServer
    3. 如果observer: 创建ObserverObserverZooKeeperServer
  8. 到这里,整个集群还是不可用的,还要数据同步,数据同步里面包括了很多细节,简单总结一下:
    1. leader启动TCP server,同时计算出当前leader的epoch,然后阻塞等待其他server的epoch同步请求
    2. follower向leader发送epoch同步请求,获取最新的epoch。如果获取成功,follower需要向leader反馈一个epoch ACK
    3. leader收到超过半数的epoch ACK,从epoch ACK上获取该follower最新的zxid,根据该zxid和本地的commitLog计算数据同步方式,然后将同步请求发送到其他follower节点
    4. follower接收同步数据,然后然后进行数据同步
    5. leader向follower发送NEWLEADER请求,等待超过半数follower返回NEWLEADER ACK
    6. leader收到半数NEWLEADER ACK后就启动LeaderZooKeeperServer,然后通过异步的方式向follower发送UPTODATE,开始正常接收请求
    7. follower收到UPTODATE之后就启动FollowerZooKeeperServer,开始正常接收请求
  9. 至此,整个集群启动成功,开始正常服务

image.png
image.png

选主交互

  1. QuorumPeer线程启动之后,先通过FastLeaderElection#lookForLeader选主,在选主完成之前,该线程会一直阻塞
  2. 初始状态为LOOKING,先将代表自己的选票发送出去:每个sid对应一条ToSend消息,然后将这些消息推送到FastLeaderElection.sendqueue队列。如果此时没有和其他server建立连接,则在发送之前会建立TCP长连接
  3. 步骤2说到了建立连接,但什么时候开启选举server端呢?其实在QuorumPeer线程启动之前server端已经开启了,对应QuorumCnxManager.Listener线程逻辑
  4. FastLeaderElection.WorkerSender线程会一直从FastLeaderElection.sendqueue队列拿消息,推送到QuorumCnxManager#sendQueue队列
  5. QuorumCnxManager.SendWorker线程会一直从QuorumCnxManager#sendQueue队列拿消息,通过TCP连接发送到其他server
  6. QuorumCnxManager.RecvWorker线程会一直从socker中读取其他server发过来的选票,然后将选票信息放到QuorumCnxManager#recvQueue队列中
  7. FastLeaderElection.WorkerReceiver线程会一直从QuorumCnxManager#recvQueue队列拿消息,然后进行判断:1. 如果不是一个有效的server节点(根据sid判断),则直接发送到FastLeaderElection.sendqueue队列,这将导致该信息直接返回server;如果当前节点状态为LOOKING,则先将该选票发送到FastLeaderElection.recvqueue队列,如果收到的选票Epoch比当前机器的Epoch小,则将当前机器的选票发送到FastLeaderElection.sendqueue队列;如果当前节点状态不为LOOKING,则将当前机器的选票发送到FastLeaderElection.sendqueue队列
  8. QuorumPeer线程中,选主逻辑会不断的FastLeaderElection.recvqueue队列中拿消息,然后进行PK。然后再重复执行上面的流程,直到选主完成

image.png

image.png

image.png

image.png

数据同步

image.png

集群事务请求

image.png

文件清理

  1. 从配置文件中获取autopurge.purgeInterval的值,表示每几个小时执行一次文件清理
  2. 如果autopurge.purgeInterval的值大于0,则创建一个每autopurge.purgeInterval小时执行一次的定时任务,任务逻辑即为清理逻辑
  3. 从配置文件中读取autopurge.snapRetainCount的值,该值表示保留最近的多少分快照文件,该值最小限制为3,以下用n代表该值
  4. 对snap文件进行降序排序,找出最新的n个snap文件
  5. n个snap文件文件中,取最小的那个文件,然后根据该文件名找到leastZxidToBeRetain,表示小于leastZxidToBeRetain的快照文件和对应得事务日志文件都可以被删除
  6. 步骤5有一个需要注意的地方,理想情况下,snap.leastZxidToBeRetain文件对应的下一个事务日志文件为log.leastZxidToBeRetain+1,但很有可能文件log.leastZxidToBeRetain+1不存在,同时log.leastZxidToBeRetain-a文件存在(a>0),为了保险起见,此时我们需要依赖log.leastZxidToBeRetain-a文件才能完成对文件snap.leastZxidToBeRetain文件的恢复,这说明snap.leastZxidToBeRetain文件可以山粗的时候,log.leastZxidToBeRetain-a文件不一定可以删除,这得分情况
  7. 基于步骤6分析得,如果有log.leastZxidToBeRetain事务日志文件,则返回包括该事务日志文件在内的后面的所有事务日志文件;否则返回包括前一个事务日志文件在内的后面的所有事务日志文件。已知log.70 log.81 log.100:假如leastZxidToBeRetain为80,则返回log.70 log.81 log.100;假如leastZxidToBeRetain为81,则返回log.81 log.100。按理来说,leastZxidToBeRetain为80的时候,也应只返回log.81 log.100,因为事务ID80是可以被删除的,那就代表下一个事务ID是从81开始,但按照代码逻辑却会返回log.70 log.81 log.100找出的这几个事务日志文件代表不能删除,假设这个文件列表为Q
  8. 删除不在文件列表Q内的事务日志文件
  9. 删除<=leastZxidToBeRetain的快照文件

image.png

image.png

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享