版本: 开始看的是3.4.14版本,后面对比了一下3.5.5版本
客户端管理
ZK服务端和客户端通过TCP长连接进行交互,ZK服务端开启TCP server有两种方式:
- 基于Netty实现的
NIOServerCnxnFactory
- 基于Java原生NIO实现的
NIOServerCnxnFactory
需要说明的是,ServerCnxnFactory
只用于客户端连接的管理,和其他server之间的交互是没有关系的,这个要搞清楚。线上用的是默认模式,也就是NIOServerCnxnFactory
模式,所以接下来主要以分析NIOServerCnxnFactory
模式为准
在说连接管理之前,先看看NIOServerCnxnFactory
中的线程管理,即:哪些线程用于处理连接请求
,哪些请求用于处理读写请求
,开启多少个Selector
线程等。因为默认情况下,具体的线程数和CPU核心数相关,所以我们这里假设CPU核心数为8
:
- 连接请求:对应
1
个线程,即AcceptThread
,该线程会持有一个独立的Selector
。如果收到连接请求
,会将该请求对应的SocketChannel
推送到SelectorThread中的阻塞队列
,接下来的逻辑又SelectorThread
来处理 SelectorThread
:线程数为sqrt(numCores/2)
,所以这里对应2个SelectorThread
线程,每个对应一个Selector
。该线程有两个作用,一个是处理AcceptThread
推送过来的连接请求
,将其封装成NIOServerCnxn
对象,维护到Map缓存中;另一个作用就是接收读写请求
,然后将读写请求封装成IOWorkRequest
对象,最后提交到ioWorkers
线程池,也就是它只管接收IO请求,然后快速提交到ioWorkers
线程池,具体的业务逻辑由ioWorkers
线程池处理,所以这个过程也非常快ioWorkers
:线程数为numCores*2
,所以这里线程数为16。该线程池用于处理SelectorThread
线程提交多来的IO任务,也就是具体的业务逻辑,该线程池类型默认为Executors.newFixedThreadPool
。NIOServerCnxn
通过SelectionKey#attachment
传递ConnectionExpirerThread
:线程数1,对应图中的ClearThread
,用于清理已经过期的连接,这里面涉及到ExpiryQueue
数据,有关于该数据结构的介绍,参考Session管理
章节
上面其实主要说明了NIOServerCnxnFactory
中的线程管理,那客户端的一个连接在NIOServerCnxnFactory
中会被抽象成什么?上面简单提到了,会被封装成NIOServerCnxn
对象
NIOServerCnxn
:可以看成是客户端连接的一个抽象。后面涉及到的一些客户请求处理,都和这个对象息息相关ipMap
: 以客户端IP为key,NIOServerCnxns为value
的hash表。一个zk客户端可以和server建立多个长连接,server端会对每个zk客户端的连接数进行限制,默认是是60
cnxns
:NIOServerCnxn
的Set集合,主要用于4字命令
Session管理
- 客户端第一次IO请求的时候,涉及到
NIOServerCnxn
对象的初始化,即执行NIOServerCnxn#readConnectRequest
方法 - 因为是第一次IO请求,此时客户端和server端仅仅是建立了TCP长连接,此时还没有创建
Session
,所以在该方法中将为该客户端创建一个SessionImpl
对象,即ZooKeeperServer#createSession
方法 - 继续执行
SessionTracker#createSession
方法,该方法会将创建的Session
对象维护到SessionTrackerImpl
中的缓存中,同时返回一个sessionId
,之后客户端每次请求的时候都要带上该sessionId
。在集群模式下,还需要将该Session同步到其他server节点,会更复杂,我们后面再分析 - 将创建的
sessionId
维护到NIOServerCnxn
对象中 NIOServerCnxn
对象已初始化,然后收到IO请求,则会执行NIOServerCnxn#readRequest
方法,进而调用ZooKeeperServer#processPacket => ZooKeeperServer#submitRequest
方法。该方法将先调用ZooKeeperServer#touch
方法,将SessionTrackerImpl
中的session活跃时间更新,如果session或已过期,则会报错;然后将请求交给ZooKeeperServer
业务逻辑链进行处理
那么SessionTrackerImpl
中的Sessions是如何维护的?它怎么知道哪个Session到期了哪个没到期?
sessionsById
:数据结构为key为sessionId, value为SessionImpl的
MapsessionExpiryQueue
:ExpiryQueue<SessionImpl>
。有关于ExpiryQueue
的数据结构,见下面的图,SessionTrackerImpl
即对应下图中的ClearThread
,在它的run
方法中会不停的清理sessionExpiryQueue
中已过期的元素
ExpiryQueue
底层的存储结构为key时间戳,value为Set
的Map,需要注意的时候,这里的时间戳可以被tickTime
整除,两个key之间至少相隔一个tickTime
- 每个格子的长度为
tickTime
,tickTime
在ZK中默认为2000 - 上面已经说错了,客户端每次请求(包括心跳)过来的时候,server端都需要更新
该客户对应的Session的活跃时间
,其实就是更新该Session在ExpiryQueue
中的格子位置,这将触发ExpiryQueue#update
方法 - 假如客户端配置的
sessionTimeOut
为10,当前时间为1618647260000,在执行ExpiryQueue#update
方法的时候,首先以((now + sessionTimeOut)/tickTime + 1) * tickTime
计算出给session的格子位置,即((1618647260000+10000)/2000 + 1)/2000 = 16186472612000
,该公式可以保证计算出来的结果可以被tickTime
整除 - 根据步骤4的公式可以得出,如果这是一个刚刚创建的Session,那么该Session将会被放到
16186472612000格子
对应的Set集合中;如果这是一个在ExpiryQueue
中已经存在的Session,则会先将该Session移动到16186472612000格子
对应的Set集合中,然后从老的Set集合中删除 - 上面只说了Session的更新,还有些情况没考虑。假如客户端连接断开了,那么该Sessioin会一直在该Map中不会被清除,这样将导致内存泄漏了,最后可能会导致Server端OOM。除此之外,ZK中的
sessionTimeOut
是一个非常重要的概念,比如客户端在sessionTimeOut
之后又重连上来了,这时候该Session其实是不可用的。那说明还需要有一种失效Session清理机制 - 清理逻辑其实很简单,开一个独立的线程,一直以当前时间计算出格子,然后将该格子对应的Set集合为全部清除。根据上面的介绍我们明确的知道,如果一个Session是有效的,它会一直向后移动,当前时间格子对应的Set集合,代表是可以被清除的Sessions
业务逻辑链
上面只是介绍了客户端的连接管理,还没有看到客户端提交的业务请求流程,比如创建一个节点
,不过我们大概也可以猜测入口在ioWorker
中。而有关于server端的处理逻辑,其实是通过责任链的形式将各个RequestProcessor
组织起来的,比如:写事务日志
、更新DataTree缓存
等,在ZK中对应着不同的RequestProcessor
。根据zkServer角色的不同,对应的责任链也不一样。
比如在单机模式下,在ZooKeeperServer#startup
方法中,会执行RequestProcessor
链路初始化的逻辑,如下:
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
复制代码
下图是各个不同状态的zkServer对应的RequestProcessors
数据存储
- 有关于ZK的数据存储,都和
ZKDatabase
相关,可以看成是ZK操作数据的门面类,里面包括了ZK以下几种类型的数据:DataTree
、FileTxnSnapLog
、minCommittedLog
、maxCommittedLog
、committedLog
等 - DataTree:ZK节点在内存中的表现形式,包括
节点路径和节点的Map映射
、dataWatches
、childWatches
、ephemerals
等 - 快照文件:快照文件对应ZK截至到某一个时刻的全量数据,文件名为
snapshot.{事务ID}
,其中文件名上的事务ID表示这个文件里面最大的事务ID
。每执行50000~100000
个事务后,会创建一个snapshot文件。可以根据ZK的文件清理策略对其进行清理 - 事务日志文件:对应ZK的写前日志,文件格式为
log.{事务ID}
,其中文件名上的事务ID表示这个文件里面最小的事务ID
- 每提交一个事务后,会将该数据提交到
DataTree
缓存中都i - 每次ZKServer启动之前,都会将快照日志和事务日志加载到
DataTree
。快照文件的加载顺序是:对所有快照按照事务ID从大到小进行排序,取出其中的前100个,然后依进行加载,如果第一个文件加载成功就直接返回,否则加载第二个,以此类推。快照文件加载之后,即可以得到现阶段的一个最新事务ID,然后根据该事务ID去查找需要加载事务日志文件,如果找到事务日志文件,则会对事务日志进行一一回放,至此有关于数据加载已经完成。有关于根据事务ID找事务文件时涉及到的一些细节,可以参考文件清理章节
Watch机制
- watch对应两部分:
watch注册
和watch触发
。可以通过ZK提供的几个固定API供进行watch注册
,注册的watch会在客户端保存,同时在服务端被标记;watch触发
就是说当对应的节点发生变更时,服务端会向客户端派发对应的节点变更事件 - 在原生ZK中,提供了以下几个API供我们注册
Watch
,分别为Zookeeper构造函数
、Zookeeper#exists
、Zookeeper#getData
、Zookeeper#getChildren
、Zookeeper#removeWatches
。我们平时使用zkClient的各种subscribexx
等方法,都是经过封装过的 - 对应的节点变更事件有:
创建节点事件
、删除节点事件
、更改节点事件
、子节点列表变化事件
,对父节点的变更以及孙节点的变更都不会触发watcher,而对watcher本身节点以及子节点的变更会触发watcher
Watch注册
下面我们以Zookeeper#getData
为例,对应的目标节点是/zk/test
,讲解watch注册
的整个流程
- 执行
Zookeeper#getData
方法,假设这里我们传入了一个自己写的Watcher对象 - 将Watcher对象封装成
DataWatchRegistration
对象,将请求体封装成GetDataRequest
对象,然后将该消息推送到异步发送请求线程的阻塞队列中
- 在该请求返回前,客户端线程会一直阻塞
- 客户端收到返回结果后,执行
DataWatchRegistration#register
方法,该方法主要用于向Zookeeper.ZKWatchManager.dataWatches
缓存中添加我们传入的那个Watcher
那么服务端收到watch注册
请求会做什么呢?
- 服务端收到客户端请求,请求最终会进入
FinalRequestProcessor#processRequest
方法,然后执行ZKDatabase#getData => DataTree#getData
方法 - 先从DataTree中获取节点数据,然后将
代表当前客户连接的NIOServerCnxn
当作一个Watcher添加到DataTree.dataWatches
缓存中,NIOServerCnxn实现了Watcher接口 - 我们可以先看看
NIOServerCnxn#process
方法的逻辑,其实就是创建一个WatcherEvent
对象,然后发送给客户端
Watch触发
下面我们以Zookeeper#setData
为例,对应的目标节点是/zk/test
,讲解watch触发
的整个流程
- 以创建节点为例,最终执行到
FinalRequestProcessor#processRequest
方法,然后执行ZooKeeperServer#processTxn => ZKDatabase#processTxn => DataTree#processTxn
方法 - 执行
DataTree#setData
方法,做一些前置校验,然后给节点赋值 - 触发
dataWatches.triggerWatch
方法,即触发watch,对应的是WatchManager#triggerWatch
方法 - 在
WatchManager#triggerWatch
方法中,先根据path找到watchers,然后将这些watchers从缓存中删除,然后执行Watcher#process
方法,和上面对应起来,这里会执行NIOServerCnxn#process
方法,即向客户端发送一个WatcherEvent
事件
那么客户端收到服务端发送过来的WatcherEvent
事件会怎么出列呢?
- 在客户端的
SendThread#readResponse
方法中,解析服务端返回的WatcherEvent
事件,然后将该WatcherEvent
推送到EventThread.waitingEvents
阻塞队列中 EventThread
线程不断从EventThread.waitingEvents
阻塞队列中拿消息吗,然后执行根据服务端返回的WatcherEvent
找到客户端之前注册的Watcher对象,同时将该Watcher从客户端移除,然后执行EventThread#processEvent
方法- 在
EventThread#processEvent
方法中,会触发客户端的Watcher#process
方法,支持,整个流程结束
单机启动流程
有了上面几个章节的基础,现在简单串起来看看ZK单机模式下的启动流程
- 从配置文件中加载配置项
- 启动文件清理的定时任务
- 启动AdminServer
- 启动
NIOServerCnxnFactory
,启动相关的线程,这时候就可以接收客户端连接了 - 创建
ZkDatabase
,并从快照文件和事务日志文件中加载数据到内存 - 启动
ZooKeeperServer
,在这个期间,会启动SessionTracker
,用于Session的管理 - 初始化
RequestProcessor
责任链
单机事务请求
- 整体流程参考下图,还有很多细节有待研究
- 有关于
ZooKeeperServer.outstandingChanges
,需要知道的是,只有写请求的时候才会向这个阻塞队列里面推消息。关于它的作用,目前根据它用到的地方,我感觉是和zk的OpCode.multi
指令有关:当我们在一次请求中涉及到多个命令,比如先改 -> 再查 -> 再改
,当第一次修改之后,写了事务日志,但因为还没提交,所以再内存中看到的还是老数据,通过ZooKeeperServer.outstandingChanges
,就可以先将修改后的值写到这个阻塞队列和其独赢的缓存,然后查询的时候就可以看到当前请求修改的数据了。能力有限,有待研究 - 在
FinalRequestProcessor#processRequest
中,会先执行ZKDatabase#processTxn
方法,即内存DataTree缓存。然后更新ZKDatabase.committedLog
队列,这个队列主要用于数据同步,基于这个确定数据同步方式
集群启动流程
- 加载配置项及前置校验
- 加载数据:从快照文件和事务日志文件中加载数据到内存
- 启动
NIOServerCnxnFactory
,启动相关的线程,这时候就可以接收客户端连接了 - 启动AdminServer
- 初始化选主用的相关资源,主要包括:
QuorumCnxManager.Listener#start
、FastLeaderElection#start
。这里面一共涉及到5个线程,开启了几个server之间选主长连接的端口 - 选主: 执行
QuorumPeer#run => FastLeaderElection#lookForLeader
,这个动作做完之后,将触发各个server之间暗建立选主专用的长连接,让步骤5中的几个线程活动起来了 - 步骤6选主完成之后,已经确定了
准leader
,各个server的状态创建对应的ZookeeperServer,因为根据Server状态的不同,他们的职责是不一样的,所以要区分开来,主要有以下状态:- 如果
leader
: 创建Leader
和LeaderZooKeeperServer
- 如果
follower
: 创建Follower
和FollowerZooKeeperServer
- 如果
observer
: 创建Observer
和ObserverZooKeeperServer
- 如果
- 到这里,整个集群还是不可用的,还要数据同步,数据同步里面包括了很多细节,简单总结一下:
- leader启动
TCP server
,同时计算出当前leader的epoch
,然后阻塞等待其他server的epoch同步请求
- follower向leader发送
epoch同步请求
,获取最新的epoch
。如果获取成功,follower需要向leader反馈一个epoch ACK
- leader收到超过半数的
epoch ACK
,从epoch ACK
上获取该follower最新的zxid
,根据该zxid
和本地的commitLog
计算数据同步方式,然后将同步请求发送到其他follower节点 - follower接收同步数据,然后然后进行数据同步
- leader向follower发送
NEWLEADER
请求,等待超过半数follower返回NEWLEADER ACK
- leader收到半数
NEWLEADER ACK
后就启动LeaderZooKeeperServer
,然后通过异步的方式向follower发送UPTODATE
,开始正常接收请求 - follower收到
UPTODATE
之后就启动FollowerZooKeeperServer
,开始正常接收请求
- leader启动
- 至此,整个集群启动成功,开始正常服务
选主交互
QuorumPeer
线程启动之后,先通过FastLeaderElection#lookForLeader
选主,在选主完成之前,该线程会一直阻塞- 初始状态为
LOOKING
,先将代表自己的选票发送出去:每个sid对应一条ToSend
消息,然后将这些消息推送到FastLeaderElection.sendqueue
队列。如果此时没有和其他server建立连接,则在发送之前会建立TCP长连接 - 步骤2说到了建立连接,但什么时候开启选举server端呢?其实在
QuorumPeer
线程启动之前server端已经开启了,对应QuorumCnxManager.Listener
线程逻辑 FastLeaderElection.WorkerSender
线程会一直从FastLeaderElection.sendqueue
队列拿消息,推送到QuorumCnxManager#sendQueue
队列QuorumCnxManager.SendWorker
线程会一直从QuorumCnxManager#sendQueue
队列拿消息,通过TCP连接发送到其他serverQuorumCnxManager.RecvWorker
线程会一直从socker中读取其他server发过来的选票,然后将选票信息放到QuorumCnxManager#recvQueue
队列中FastLeaderElection.WorkerReceiver
线程会一直从QuorumCnxManager#recvQueue
队列拿消息,然后进行判断:1. 如果不是一个有效的server节点(根据sid判断),则直接发送到FastLeaderElection.sendqueue
队列,这将导致该信息直接返回server;如果当前节点状态为LOOKING
,则先将该选票发送到FastLeaderElection.recvqueue
队列,如果收到的选票Epoch比当前机器的Epoch小,则将当前机器的选票发送到FastLeaderElection.sendqueue
队列;如果当前节点状态不为LOOKING
,则将当前机器的选票发送到FastLeaderElection.sendqueue
队列- 在
QuorumPeer
线程中,选主逻辑会不断的FastLeaderElection.recvqueue
队列中拿消息,然后进行PK。然后再重复执行上面的流程,直到选主完成
数据同步
集群事务请求
文件清理
- 从配置文件中获取
autopurge.purgeInterval
的值,表示每几个小时执行一次文件清理 - 如果
autopurge.purgeInterval
的值大于0,则创建一个每autopurge.purgeInterval
小时执行一次的定时任务,任务逻辑即为清理逻辑 - 从配置文件中读取
autopurge.snapRetainCount
的值,该值表示保留最近的多少分快照文件,该值最小限制为3,以下用n
代表该值 - 对snap文件进行降序排序,找出最新的
n
个snap文件 - 从
n
个snap文件文件中,取最小的那个文件,然后根据该文件名找到leastZxidToBeRetain
,表示小于leastZxidToBeRetain
的快照文件和对应得事务日志文件都可以被删除 - 步骤5有一个需要注意的地方,理想情况下,
snap.leastZxidToBeRetain
文件对应的下一个事务日志文件为log.leastZxidToBeRetain+1
,但很有可能文件log.leastZxidToBeRetain+1
不存在,同时log.leastZxidToBeRetain-a
文件存在(a>0),为了保险起见,此时我们需要依赖log.leastZxidToBeRetain-a
文件才能完成对文件snap.leastZxidToBeRetain
文件的恢复,这说明snap.leastZxidToBeRetain
文件可以山粗的时候,log.leastZxidToBeRetain-a
文件不一定可以删除,这得分情况 - 基于步骤6分析得,如果有
log.leastZxidToBeRetain
事务日志文件,则返回包括该事务日志文件在内的后面的所有事务日志文件;否则返回包括前一个事务日志文件在内的后面的所有事务日志文件。已知log.70 log.81 log.100
:假如leastZxidToBeRetain
为80,则返回log.70 log.81 log.100
;假如leastZxidToBeRetain
为81,则返回log.81 log.100
。按理来说,leastZxidToBeRetain
为80的时候,也应只返回log.81 log.100
,因为事务ID80
是可以被删除的,那就代表下一个事务ID是从81开始,但按照代码逻辑却会返回log.70 log.81 log.100
。找出的这几个事务日志文件代表不能删除,假设这个文件列表为Q - 删除不在文件列表
Q
内的事务日志文件 - 删除
<=leastZxidToBeRetain
的快照文件