- 首先,找到服务端的启动类
QuorumPeerMain
QuorumPeerMain.java
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
main.initializeAndRun(args);
} catch (IllegalArgumentException e) {
...
}
...
}
复制代码
main()方法非常的简单,具体的逻辑都在main.initializeAndRun(args);
- 服务端的启动方式有两种,一种是单机模式,另一种是集群模式。接着继续看
initializeAndRun(args
方法。
QuorumPeerMain.java
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
// zoo.cfg的配置类
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}
// Start and schedule the the purge task
// 开启和调度清理任务
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
config.getDataDir(),
config.getDataLogDir(),
config.getSnapRetainCount(),
config.getPurgeInterval());
purgeMgr.start();
// 集群模式
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
} else {
// 直接启动单机模式
LOG.warn("Either no config or no quorum defined in config, running in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}
复制代码
这个方法主要做了几个事情。
- 解析zoo.cfg配置文件
- 开启和调度清理任务
- 通过配置文件确定要以单机还是集群模式启动。
public boolean isDistributed() {
return quorumVerifier != null && (!standaloneEnabled || quorumVerifier.getVotingMembers().size() > 1);
}
复制代码
这个方法,主要就是判断zoo.cfg里面的server.1=127.0.0.1:2888:3888一共有几条。
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890
复制代码
上面的配置文件中有3条,也就是3个节点,所以就会以集群的方式启动。
这里,先看单机模式是如何启动的。
- 单机模式启动
ZooKeeperServerMain.java
public static void main(String[] args) {
ZooKeeperServerMain main = new ZooKeeperServerMain();
try {
main.initializeAndRun(args);
} catch (IllegalArgumentException e) {
....
}
...
}
复制代码
主要逻辑都在initializeAndRun(args)
方法, 接着继续看
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
// 解析zoo.cfg配置文件
ServerConfig config = new ServerConfig();
if (args.length == 1) {
config.parse(args[0]);
} else {
config.parse(args);
}
// 开始运行
runFromConfig(config);
}
复制代码
这一步的话,也是解析配置,然后交给下一步。
public void runFromConfig(ServerConfig config) throws IOException, AdminServerException {
LOG.info("Starting server");
FileTxnSnapLog txnLog = null;
try {
...
// Note that this thread isn't going to be doing anything else,
// so rather than spawning another thread, we will just call
// run() in this thread.
// create a file logger url from the command line args
// 创建一个文件事务快照日志
txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);
...
// 创建ZooKeeperServer
final ZooKeeperServer zkServer = new ZooKeeperServer(jvmPauseMonitor, txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, config.listenBacklog, null, config.initialConfig);
txnLog.setServerStats(zkServer.serverStats());
// Registers shutdown handler which will be used to know the
// server error or shutdown state changes.
final CountDownLatch shutdownLatch = new CountDownLatch(1);
zkServer.registerServerShutdownHandler(new ZooKeeperServerShutdownHandler(shutdownLatch));
// Start Admin server
adminServer = AdminServerFactory.createAdminServer();
adminServer.setZooKeeperServer(zkServer);
adminServer.start();
boolean needStartZKServer = true;
// 初始化服务端连接管理
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
cnxnFactory.startup(zkServer);
// zkServer has been started. So we don't need to start it again in secureCnxnFactory.
needStartZKServer = false;
}
...
// Watch status of ZooKeeper server. It will do a graceful shutdown
// if the server is not running or hits an internal error.
// 阻塞
shutdownLatch.await();
shutdown();
...
} catch (InterruptedException e) {
...
} finally {
...
}
}
复制代码
这个方法删减了一些不是很重要的步骤,剩下主要就是这个几个。
- 创建一个文件事务快照日志
- 创建ZooKeeperServer
- 初始化服务端连接管理
- 下面主要看初始化服务端连接管理
if (config.getClientPortAddress() != null) {
// 初始化连接工厂
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
// 启动
cnxnFactory.startup(zkServer);
// zkServer has been started. So we don't need to start it again in secureCnxnFactory.
needStartZKServer = false;
}
复制代码
ServerCnxnFactory.java
这个类有两个实现,一个是基于原生NIO的实现,另外一个是基于Netty的实现。
ServerCnxnFactory#createFactory()
, 默认是使用原生NIO实现,也可以用过指定系统变量,用Netty的实现。
public static ServerCnxnFactory createFactory() throws IOException {
String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
if (serverCnxnFactoryName == null) {
serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
}
try {
ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
.getDeclaredConstructor()
.newInstance();
...
} catch (Exception e) {
...
}
}
复制代码
下面的分析都是基于Netty的实现,即是NettyServerCnxnFactory
- 启动服务端
NettyServerCnxnFactory.java
看下NettyServerCnxnFactory
的构造函数。
NettyServerCnxnFactory() {
x509Util = new ClientX509Util();
...
EventLoopGroup bossGroup = NettyUtils.newNioOrEpollEventLoopGroup(NettyUtils.getClientReachableLocalInetAddressCount());
EventLoopGroup workerGroup = NettyUtils.newNioOrEpollEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workerGroup)
.channel(NettyUtils.nioOrEpollServerSocketChannel())
// parent channel options
.option(ChannelOption.SO_REUSEADDR, true)
// child channels options
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_LINGER, -1)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
...
// 重要,主要的处理器
pipeline.addLast("servercnxnfactory", channelHandler);
}
});
...
}
复制代码
一个典型的Netty Server启动辅助类构建,最主要的业务处理就是在channelHandler处理器了,CnxnChannelHandler.java
, 在分析服务端处理客户端请求的时候会分析。
NettyServerCnxnFactory#startup()
@Override
public void startup(ZooKeeperServer zks, boolean startServer) throws IOException, InterruptedException {
start();
setZooKeeperServer(zks);
if (startServer) {
zks.startdata();
zks.startup();
}
}
复制代码
NettyServerCnxnFactory#start()
@Override
public void start() {
if (listenBacklog != -1) {
bootstrap.option(ChannelOption.SO_BACKLOG, listenBacklog);
}
LOG.info("binding to port {}", localAddress);
// 绑定端口
parentChannel = bootstrap.bind(localAddress).syncUninterruptibly().channel();
// Port changes after bind() if the original port was 0, update
// localAddress to get the real port.
localAddress = (InetSocketAddress) parentChannel.localAddress();
LOG.info("bound to port {}", getLocalPort());
}
复制代码
绑定端口,启动服务器
bootstrap.bind(localAddress).syncUninterruptibly().channel();
NettyServerCnxnFactory#setZooKeeperServer()
简单的赋值
- 接下来,继续看ZookeeperServer的启动
ZookeeperServer.java
ZookeeperServer#startdata()
public void startdata() throws IOException, InterruptedException {
//check to see if zkDb is not null
if (zkDb == null) {
// 初始化,ZK数据库
zkDb = new ZKDatabase(this.txnLogFactory);
}
// 加载数据
if (!zkDb.isInitialized()) {
loadData();
}
}
复制代码
这一步主要就是加载ZookeeperServer的ZK数据库。
ZookeeperServer#startup()
public synchronized void startup() {
startupWithServerState(State.RUNNING);
}
复制代码
ZookeeperServer#startupWithServerState(State.RUNNING);
private void startupWithServerState(State state) {
// 会话管理
if (sessionTracker == null) {
createSessionTracker();
}
startSessionTracker();
// 请求处理 Processor
setupRequestProcessors();
// 请求限制
startRequestThrottler();
registerJMX();
startJvmPauseMonitor();
registerMetrics();
setState(state);
requestPathMetricsCollector.start();
localSessionEnabled = sessionTracker.isLocalSessionsEnabled();
notifyAll();
}
复制代码
这一步,几个比较重要的步骤。
- 会话管理 —— startSessionTracker()
- 请求处理器初始化 —— setupRequestProcessors()
会话管理,后面还会继续分析。看一下请求处理器初始化。
ZookeeperServer#setupRequestProcessors();
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
((SyncRequestProcessor) syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor) firstProcessor).start();
}
复制代码
PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor
这就构成了在单机模式下,服务端对客户端请求的处理链。
在单机模式下,整个服务端的启动流程就结束了。
总结一下,几个比较重要的地方。
- 解析zoo.cfg配置文件
- 构建
NettyServerCnxnFactory
,初始化handlers,bind()绑定端口 - 初始化ZK数据库
- 初始化会话管理
- 初始化ZookeeperServer的processors处理器