Apache Zookeeper是一款分布式协调服务开源组件,自身提供高可用保障,因此被广泛用于其他组件的服务高可用保障基础设施。
我们从源码层次解读一下zookeeper服务节点启动流程,从而熟悉zookeeper底层原理和内部核心流程实现逻辑。
下载zookeeper源码(这里选择3.6.1版本),导入IDEA,是一个maven管理的项目。
一、找到启动类
节点启动入口在哪呢?我们需要从bin目录下找到启动脚本 zkServer.sh(Linux操作系统)或者 zkServer.cmd (Windows操作系统),从脚本里面寻找启动类。
因此启动jvm进程的命令就是如图所示方框里面的内容:
nohup "$JAVA" $ZOO_DATADIR_AUTOCREATE "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" \
"-Dzookeeper.log.file=${ZOO_LOG_FILE}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
-XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p' \
-cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &
复制代码
其中 $ZOOMAIN 从脚本里面找一下,其实就是:
从而找到zookeeper节点启动类就是 org.apache.zookeeper.server.quorum.QuorumPeerMain
。
找到这个类,进入main方法,可以看到里面先创建了一个QuorumPeerMain对象,然后调用该对象的initializeAndRun方法,并针对可能出现的各种异常进行捕获并以不同的错误异常码退出进程,最后正常退出(正常情况下Zookeeper节点服务启动后,jvm进程会一直运行)。
/**
* To start the replicated server specify the configuration file name on
* the command line.
* @param args path to the configfile
*/
public static void main(String[] args) {
// 新建了一个QuorumPeerMain对象
QuorumPeerMain main = new QuorumPeerMain();
try {
// 使用参数进行初始化并运行
main.initializeAndRun(args);
// 针对运行过程中可能出现的各种异常进行捕获,并以不同的异常码退出服务
} catch (IllegalArgumentException e) {
LOG.error("Invalid arguments, exiting abnormally", e);
LOG.info(USAGE);
System.err.println(USAGE);
ZKAuditProvider.addServerStartFailureAuditLog();
ServiceUtils.requestSystemExit(ExitCode.INVALID_INVOCATION.getValue());
} catch (ConfigException e) {
LOG.error("Invalid config, exiting abnormally", e);
System.err.println("Invalid config, exiting abnormally");
ZKAuditProvider.addServerStartFailureAuditLog();
ServiceUtils.requestSystemExit(ExitCode.INVALID_INVOCATION.getValue());
} catch (DatadirException e) {
LOG.error("Unable to access datadir, exiting abnormally", e);
System.err.println("Unable to access datadir, exiting abnormally");
ZKAuditProvider.addServerStartFailureAuditLog();
ServiceUtils.requestSystemExit(ExitCode.UNABLE_TO_ACCESS_DATADIR.getValue());
} catch (AdminServerException e) {
LOG.error("Unable to start AdminServer, exiting abnormally", e);
System.err.println("Unable to start AdminServer, exiting abnormally");
ZKAuditProvider.addServerStartFailureAuditLog();
ServiceUtils.requestSystemExit(ExitCode.ERROR_STARTING_ADMIN_SERVER.getValue());
} catch (Exception e) {
LOG.error("Unexpected exception, exiting abnormally", e);
ZKAuditProvider.addServerStartFailureAuditLog();
ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
}
// 正常退出,一般不会执行到这里
LOG.info("Exiting normally");
ServiceUtils.requestSystemExit(ExitCode.EXECUTION_FINISHED.getValue());
}
复制代码
这里异常主要包括非法参数异常、配置异常、数据目录访问异常、AdminServer启动异常以及其他异常等。
新建QuorumPeerMain对象比较简单,QuorumPeerMain类没有重写无参构造器,因此没有任何逻辑。
下面重点看下初始化和运行过程:
二、QuorumPeerMain对象初始化和运行
QuorumPeerMain对象的initializeAndRun方法,主要有三部分:
- 构建QuorumPeerConfig对象,并解析参数
- 生成一个数据目录清理管理器
DatadirCleanupManager
,并运行 - 调用runFromConfig(集群分布式模式)或者通过调用ZookeeperServerMain.main(单机本地模式)启动服务
一)解析参数
org.apache.zookeeper.server.quorum.QuorumPeerConfig
是一个配置参数类,其属性是QuorumPeer(Zookeeper启动的服务实例类型,后面可以看到)运行所需的各种参数,同时对部分参数还提供了默认值。
在查看Zookeeper启动脚本的时候,可以看到启动QuorumPeerMain类时,是有传入$ZOOCFG参数的,而这个参数值为 zoo.cfg,因此这一步就是解析这个文件获取到相关配置参数。
然后再这个parse方法里面,加载zoo.cfg配置文件生成Properties对象,然后进一步调用parseProperties
方法获取参数。
二)生成并启动数据目录清理管理器
org.apache.zookeeper.server.DatadirCleanupManager
类主要用于对快照文件和事务日志文件进行自动清理,涉及到有两个参数:‘autopurge.purgeInterval’ 自动清理间隔时长,和 ‘autopurge.snapRetainCount’ 自动清理快照保留个数。
调用了start方法,内部生成了一个Timer定时器以及PurgeTask这个类型的TimerTask,然后以每按 清理间隔时长执行清理任务。
三)启动服务
这里仅以集群启动方法进行探索源码。
runFromConfig方法里面主要是生成了一个QuorumPeer对象,并调用它的start方法执行,最后调用join方法让线程一直等待。
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
LOG.info("Starting quorum peer");
MetricsProvider metricsProvider;
try {
metricsProvider = MetricsProviderBootstrap.startMetricsProvider(
config.getMetricsProviderClassName(),
config.getMetricsProviderConfiguration());
} catch (MetricsProviderLifeCycleException error) {
throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error);
}
try {
ServerMetrics.metricsProviderInitialized(metricsProvider);
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
}
if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
}
quorumPeer = getQuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
quorumPeer.setConfigFileName(config.getConfigFilename());
quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier() != null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
quorumPeer.initConfigInZKDatabase();
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setSslQuorum(config.isSslQuorum());
quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
if (config.sslQuorumReloadCertFiles) {
quorumPeer.getX509Util().enableCertFileReloading();
}
quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());
quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());
quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());
// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if (quorumPeer.isQuorumSaslAuthEnabled()) {
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();
if (config.jvmPauseMonitorToRun) {
quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
}
quorumPeer.start();
ZKAuditProvider.addZKStartStopAuditLog();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
} finally {
if (metricsProvider != null) {
try {
metricsProvider.stop();
} catch (Throwable error) {
LOG.warn("Error while stopping metrics", error);
}
}
}
}
复制代码
其中new了一个QuorumPeer对象,并设置它的各种参数,然后调用start方法进行启动,最后join等待停止。
因此每个Zookeeper节点真正启动的是一个 QuorumPeer对象,其他逻辑都在这个类里面,我们后续会重点解读这个类的源码,来探索Zookeeper启动时到底做了哪些事情。