在 前文 中,我们已经大概了解了什么是分布式配置中心以及分布式配置中心的原理及实现方式。今天我们根据上节中的配置中心原理来探讨下 nacos 在配置中心方面的实现。
前言
文章中所说源码版本以及 Nacos 官网文档:
- spring-cloud-alibaba-nacos : 2.2.5.RELEASE
- nacos-server: 2.0.2
- nacos-example: master
配置中心原理回顾
在上一节中,我们大概梳理了一下配置中心的实现原理。有如下几点:
- 客户端启动时需要从服务端获取配置并缓存到本地。
- 服务端需要将配置持久化,可提供可视化页面对配置做增删改查操作。
- 监听机制,可以检测到服务端的配置变更进行配置是实时更新。
关于这里提到的监听机制一般有两种模式。一种是 pull 模式,客户端主动向服务端拉取配置,一般采用定时任务请求服务端的方式来实现。一种是 push 模式,在配置发生变更时由服务端主动通知客户端。这两种方式都由一定的优缺点:
- pull 模式:
- 优点:需要时才拉取。可控制拉取的配置。服务端不用关心客户端的连接。
- 缺点:一般有实时要求时,会每秒钟都跑一次定时任务,对资源有一定的消耗。
- push 模式:
- 优化:有配置变更时才通知,客户端不用主动监听,减少资源的占用。
- 缺点:服务端需要保存客户端的连接信息及需要的配置。
Nacos 基本使用及原理
Nacos 介绍
对于 Nacos 介绍这一块,推荐大家还是查看 Nacos 官网查看详细介绍
Nacos 使用
前提条件
需要下载 Nacos 并启动 Nacos server,参考 Nacos 快速入门
- 为你的 Spring Cloud 应用添加依赖。
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
复制代码
注意:版本 2.1.x.RELEASE 对应的是 Spring Boot 2.1.x 版本。版本 2.0.x.RELEASE 对应的是 Spring Boot 2.0.x 版本,版本 1.5.x.RELEASE 对应的是 Spring Boot 1.5.x 版本。
更多版本对应关系参考:版本说明 Wiki
2. 在 bootstrap。properties
中配置 Nacos server 的地址和应用名
spring.cloud.nacos.config.server-addr=127.0.0.1:8848
spring.application.name=example
复制代码
说明:之所以需要配置 spring.application.name ,是因为它是构成 Nacos 配置管理 dataId字段的一部分。
在 Nacos Spring Cloud 中,dataId 的完整格式如下:
${prefix}-${spring.profiles.active}.${file-extension}
复制代码
- prefix 默认为 spring.application.name 的值,也可以通过配置项 spring.cloud.nacos.config.prefix来配置。
- spring.profiles.active 即为当前环境对应的 profile,详情可以参考 Spring Boot文档。 注意:当 spring.profiles.active 为空时,对应的连接符 – 也将不存在,dataId 的拼接格式变成 {file-extension}
- file-exetension 为配置内容的数据格式,可以通过配置项 spring.cloud.nacos.config.file-extension 来配置。目前只支持 properties 和 yaml 类型。
- 通过 Spring Cloud 原生注解 @RefreshScope 实现配置自动更新:
@RestController
@RequestMapping("/config")
@RefreshScope
public class ConfigController {
@Value("${useLocalCache:false}")
private boolean useLocalCache;
@RequestMapping("/get")
public boolean get() {
return useLocalCache;
}
}
复制代码
如此便可简单的使用 Nacos 作为 Spring Cloud 项目的配置中心。
注: 此小节参考 Nacos 官方文档
Nacos 原理源码分析
1. 加载服务端配置并缓存到本地
在 Spring 中有两个外部化配置重要的类,org.springframework.core.env.Environment
及 org.springframework.core.env.PropertySource
。所有的外部化配置都是通过 PropertySource 加载到 Environment 来进行使用。nacos 中其实也是一样,向 Environment 中添加 nacos 的 PropertySource 来完成外部化配置的载入。
PropertySourceBootstrapConfiguration
Spring Cloud 中有个 org.springframework.cloud.bootstrap.config.PropertySourceBootstrapConfiguration
配置,来完成 bootstrap 级别的配置载入。
org.springframework.cloud.bootstrap.config.PropertySourceBootstrapConfiguration#initialize
该方法实现了 ApplicationContextInitializer 接口,会在 spring 容器初始化完成后执行。
// 自动注入所有的 PropertySourceLocator
@Autowired(required = false)
private List<PropertySourceLocator> propertySourceLocators = new ArrayList<>();
@Override
public void initialize(ConfigurableApplicationContext applicationContext) {
CompositePropertySource composite = new CompositePropertySource(
BOOTSTRAP_PROPERTY_SOURCE_NAME);
// 对 propertySourceLocators 数组进行排序
AnnotationAwareOrderComparator.sort(this.propertySourceLocators);
boolean empty = true;
// 获取当前上下文中的 environment
ConfigurableEnvironment environment = applicationContext.getEnvironment();
for (PropertySourceLocator locator : this.propertySourceLocators) {
PropertySource<?> source = null;
// 回调所有的 locate 方法
source = locator.locate(environment);
if (source == null) {
continue;
}
logger.info("Located property source: " + source);
// 将不为空的 PropertySource 添加到 composite 中
composite.addPropertySource(source);
empty = false;
}
if (!empty) {
// 获取 environment 中的 PropertySources
MutablePropertySources propertySources = environment.getPropertySources();
String logConfig = environment.resolvePlaceholders("${logging.config:}");
LogFile logFile = LogFile.get(environment);
if (propertySources.contains(BOOTSTRAP_PROPERTY_SOURCE_NAME)) {
propertySources.remove(BOOTSTRAP_PROPERTY_SOURCE_NAME);
}
// 将不为空的 composite 添加到 environment 中
insertPropertySources(propertySources, composite);
reinitializeLoggingSystem(environment, logConfig, logFile);
setLogLevels(applicationContext, environment);
handleIncludedProfiles(environment);
}
}
复制代码
这里可以看到,我们的 PropertySource 最终是由 locator.locate(environment)
返回的,那 locator.locate(environment)
又是由谁来负责实现呢。在这里肯定是由 nacos 来实现并返回这个 PropertySource 的。
NacosPropertySourceLocator
通过 locator.locate(environment)
方法,我们跟踪到 org.springframework.cloud.alibaba.nacos.client.NacosPropertySourceLocator#locate
。
public class NacosPropertySourceLocator implements PropertySourceLocator {
... 省略部分代码 ...
@Override
public PropertySource<?> locate(Environment env) {
// 获取服务端连接信息
ConfigService configService = nacosConfigProperties.configServiceInstance();
if (null == configService) {
LOGGER.warn(
"no instance of config service found, can't load config from nacos");
return null;
}
long timeout = nacosConfigProperties.getTimeout();
// 初始化 NacosPropertySourceBuilder
nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService,
timeout);
String name = nacosConfigProperties.getName();
// 获取 nacosConfig 的一些配置。
String nacosGroup = nacosConfigProperties.getGroup();
String dataIdPrefix = nacosConfigProperties.getPrefix();
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = name;
}
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = env.getProperty("spring.application.name");
}
List<String> profiles = Arrays.asList(env.getActiveProfiles());
nacosConfigProperties.setActiveProfiles(profiles.toArray(new String[0]));
String fileExtension = nacosConfigProperties.getFileExtension();
CompositePropertySource composite = new CompositePropertySource(
NACOS_PROPERTY_SOURCE_NAME);
// 加载共享配置
loadSharedConfiguration(composite);
// 加载扩展配置
loadExtConfiguration(composite);
// 加载应用配置
loadApplicationConfiguration(composite, nacosGroup, dataIdPrefix, fileExtension);
return composite;
}
}
复制代码
NacosPropertySourceLocator
实现了 PropertySourceLocator
,而在 org.springframework.cloud.bootstrap.config.PropertySourceBootstrapConfiguration
中又会注入所有的 PropertySourceLocator
并进行循环调用。从这里我们知道,如果想自己实现一个配置中心,需要实现一个项对应的 PropertySourceLocator
这里 NacosPropertySourceLocator
一共加载了三种配置,分别是共享配置,扩展配置以及应用自身的配置。
loadApplicationConfiguration
我们这里先不管共享配置已以及扩展配置,不管加载那种配置,最终都是通过访问远程服务端,获取配置,只是传入的参数不一样。
private void loadApplicationConfiguration(
CompositePropertySource compositePropertySource, String nacosGroup,
String dataIdPrefix, String fileExtension) {
// 加载应用配置,这里的 dataIdPrefix ,如果没有特殊配置的话其实就是 spring.application.name。
// 这里我们用的官网的 demo 应用,所以这里其实就是 example.perproties ,也就是向服务端获取 example.perproties 文件的配置
loadNacosDataIfPresent(compositePropertySource,
dataIdPrefix + DOT + fileExtension, nacosGroup, fileExtension, true);
// 加载 profile 配置,这里如果有配置 profile 的话会依次加载,如 example-dev.peroproties 、example-prod.peroproties
for (String profile : nacosConfigProperties.getActiveProfiles()) {
String dataId = dataIdPrefix + SEP1 + profile + DOT + fileExtension;
loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup,
fileExtension, true);
}
}
复制代码
这里看到加载配置的方法还在 loadNacosDataIfPresent
, 我们继续往下看
loadNacosDataIfPresent
private void loadNacosDataIfPresent(final CompositePropertySource composite,
final String dataId, final String group, String fileExtension,
boolean isRefreshable) {
// 这里我们使用的最简单的 bootstrap.properties 没有配置 refresh 相关的配置。直接看 else
if (NacosContextRefresher.loadCount.get() != 0) {
NacosPropertySource ps;
if (!isRefreshable) {
ps = NacosPropertySourceRepository.getNacosPropertySource(dataId);
}
else {
ps = nacosPropertySourceBuilder.build(dataId, group, fileExtension, true);
}
composite.addFirstPropertySource(ps);
}
else {
// 获取 NacosPropertySource 并加载到 composite 的第一个。
NacosPropertySource ps = nacosPropertySourceBuilder.build(dataId, group,
fileExtension, isRefreshable);
composite.addFirstPropertySource(ps);
}
}
复制代码
nacosPropertySourceBuilder.build
NacosPropertySource build(String dataId, String group, String fileExtension,
boolean isRefreshable) {
// 从远端服务器获取配置数据
Properties p = loadNacosData(dataId, group, fileExtension);
if (p == null) {
p = EMPTY_PROPERTIES;
}
// 根据已有数据组装 NacosPropertySource 并返回。
NacosPropertySource nacosPropertySource = new NacosPropertySource(group, dataId,
propertiesToMap(p), new Date(), isRefreshable);
NacosPropertySourceRepository.collectNacosPropertySources(nacosPropertySource);
return nacosPropertySource;
}
复制代码
这里终于看到获取服务端数据的方法了。
loadNacosData
private Properties loadNacosData(String dataId, String group, String fileExtension) {
String data = null;
try {
// 从服务端获取远端数据
data = configService.getConfig(dataId, group, timeout);
if (!StringUtils.isEmpty(data)) {
LOGGER.info(String.format("Loading nacos data, dataId: '%s', group: '%s'",
dataId, group));
// 根据扩展名进行不一样的数据解析
if (fileExtension.equalsIgnoreCase("properties")) {
Properties properties = new Properties();
properties.load(new StringReader(data));
return properties;
}
else if (fileExtension.equalsIgnoreCase("yaml")
|| fileExtension.equalsIgnoreCase("yml")) {
YamlPropertiesFactoryBean yamlFactory = new YamlPropertiesFactoryBean();
yamlFactory.setResources(new ByteArrayResource(data.getBytes()));
return yamlFactory.getObject();
}
}
}
catch (NacosException e) {
LOGGER.error("get data from Nacos error,dataId:{}, ", dataId, e);
}
catch (Exception e) {
LOGGER.error("parse data from Nacos error,dataId:{},data:{},", dataId, data,
e);
}
return null;
}
复制代码
getConfig & getConfigInner
@Override
public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {
return getConfigInner(namespace, dataId, group, timeoutMs);
}
复制代码
这里在 com.alibaba.nacos.client.config.NacosConfigService#getConfig
方法中直接调用的 getConfigInner
我们也直接看 getConfigInner
。
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
group = null2defaultGroup(group);
ParamUtils.checkKeyParam(dataId, group);
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setTenant(tenant);
cr.setGroup(group);
// 优先使用本地配置
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
if (content != null) {
log.warn(agent.getName(), "[get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", dataId,
group, tenant, ContentUtils.truncateContent(content));
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
try {
// 如果本地配置为空,则从服务端获取配置信息
content = worker.getServerConfig(dataId, group, tenant, timeoutMs);
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
} catch (NacosException ioe) {
if (NacosException.NO_RIGHT == ioe.getErrCode()) {
throw ioe;
}
log.warn("NACOS-0003",
LoggerHelper.getErrorCodeStr("NACOS", "NACOS-0003", "环境问题", "get from server error"));
log.warn(agent.getName(), "[get-config] get from server error, dataId={}, group={}, tenant={}, msg={}",
dataId, group, tenant, ioe.toString());
}
log.warn(agent.getName(), "[get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", dataId,
group, tenant, ContentUtils.truncateContent(content));
// 如果出现异常,则使用本地快照文件加载配置
content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
复制代码
worker.getServerConfig
com.alibaba.nacos.client.config.impl.ClientWorker#getServerConfig
public String getServerConfig(String dataId, String group, String tenant, long readTimeout)
throws NacosException {
if (StringUtils.isBlank(group)) {
group = Constants.DEFAULT_GROUP;
}
HttpResult result = null;
try {
List<String> params = null;
if (StringUtils.isBlank(tenant)) {
params = Arrays.asList("dataId", dataId, "group", group);
} else {
params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant);
}
// 通过 http 请求服务端获取配置
result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
} catch (IOException e) {
log.error(agent.getName(), "NACOS-XXXX",
"[sub-server] get server config exception, dataId={}, group={}, tenant={}, msg={}", dataId, group,
tenant, e.toString());
throw new NacosException(NacosException.SERVER_ERROR, e.getMessage());
}
switch (result.code) {
// check 返回码。如果有正常响应则将内容保存为 snapshot 并返回配置内容
case HttpURLConnection.HTTP_OK:
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);
return result.content;
// check 返回码。如果有 404 响应则将 snapshot 设置为空并返回空内容
case HttpURLConnection.HTTP_NOT_FOUND:
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
return null;
// 其他异常情况。
case HttpURLConnection.HTTP_CONFLICT: {
log.error(agent.getName(), "NACOS-XXXX",
"[sub-server-error] get server config being modified concurrently, dataId={}, group={}, tenant={}",
dataId, group, tenant);
throw new NacosException(NacosException.CONFLICT,
"data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
}
case HttpURLConnection.HTTP_FORBIDDEN: {
log.error(agent.getName(), "NACOS-XXXX", "[sub-server-error] no right, dataId={}, group={}, tenant={}",
dataId, group, tenant);
throw new NacosException(result.code, result.content);
}
default: {
log.error(agent.getName(), "NACOS-XXXX", "[sub-server-error] dataId={}, group={}, tenant={}, code={}",
dataId, group, tenant, result.code);
throw new NacosException(result.code,
"http error, code=" + result.code + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
}
}
}
复制代码
这里可以看到,getServerConfig
里通过 GET 方法请求了服务端的 /v1/cs/configs
。这个方法是服务端的方法,主要作用是返回客户端请求的配置内容。这里的实现我们暂且不看,到这里客户端就从服务端获取到配置内容了。这里依次返回回到最开始的地方,也就是 org.springframework.cloud.bootstrap.config.PropertySourceBootstrapConfiguration#initialize
方法,已经将 nacosServer 返回的配置内容加载到 Environment
中,我们的 Spring Cloud 应用就可以获取到 nacos 中添加的配置内容了。
2. 服务端将配置持久化以及对配置做操作
关于第二点,nacos 一般使用 mysql 数据库来做配置的持久化,其实这里就是对 mysql 中数据的增删改查,这里就不做详细介绍,有兴趣的可以查看官方源码。com.alibaba.nacos.config.server.controller.ConfigController#getConfig
可以以此方法为入口查看。
3. 客户端配置的动态感知
客户端
NacosConfigService
在 com.alibaba.nacos.client.config.NacosConfigService#NacosConfigService
构造方法中,当 NacosConfigService 被实例化后,做了一些事情:
- 初始化了 httpAgent ,实际工作类是
ServerHttpAgent
,这里其实主要是用于与服务端连接。 - 初始化了 ClientWorker,httpAgent 作为参数传入,可以猜测内部应该也是请求服务器远端做一些事情。
public NacosConfigService(Properties properties) throws NacosException {
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {
encode = Constants.ENCODE;
} else {
encode = encodeTmp.trim();
}
String namespaceTmp = properties.getProperty(PropertyKeyConst.NAMESPACE);
if (StringUtils.isBlank(namespaceTmp)) {
namespace = TenantUtil.getUserTenant();
properties.put(PropertyKeyConst.NAMESPACE, namespace);
} else {
namespace = namespaceTmp;
properties.put(PropertyKeyConst.NAMESPACE, namespace);
}
agent = new ServerHttpAgent(properties);
agent.start();
worker = new ClientWorker(agent, configFilterChainManager);
}
复制代码
ClientWorker
可以看到 ClientWorker
在这里将 httpAgent 维持在了自己内部,而且还创建了两个线程池:
public ClientWorker(final ServerHttpAgent agent, final ConfigFilterChainManager configFilterChainManager) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
// 初始化一个定时调度的线程池
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
// 初始化一个可缓存的线程池,根据线程名猜测,这个线程池应该是用于与服务端的长轮询。
executorService = Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPulling" + agent.getName());
t.setDaemon(true);
return t;
}
});
// 设置定时任务的执行内容
executor.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
checkConfigInfo();
} catch (Throwable e) {
log.error(agent.getName(), "NACOS-XXXX", "[sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
复制代码
这里看到两个线程池的初始化:
- 第一个线程池是一个定时调度的线程池,每隔 10ms 就会执行一次 checkConfigInfo() 方法
- 第二个线程池是一个普通的线程池,从线程名猜测是用于服务端的长轮询。
checkConfigInfo
进入 checkConfigInfo
继续往下看:
AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(new HashMap<String, CacheData>());
public void checkConfigInfo() {
// 分任务
int listenerSize = cacheMap.get().size();
// 向上取整为批数
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
// 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
executorService.execute(new LongPullingRunnable(i));
}
// 更新当前已启动的任务数
currentLongingTaskCount = longingTaskCount;
}
}
复制代码
这里主要是根据 cacheMap 的大小,将缓存中的数据分给多个线程去处理。
默认情况下,每个长轮询 LongPullingRunnable
任务处理 3000 个监听的配置集。如果超过 3000 则需要多个任务去处理。
LongPullingRunnable#run()
class LongPullingRunnable implements Runnable {
private int taskId;
public LongPullingRunnable(int taskId) {
this.taskId = taskId;
}
public void run() {
try {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
// check failover config
for (CacheData cacheData : cacheMap.get().values()) {
// taskId 用于区分 cacheMap 中的任务批次
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
// 通过本地文件中缓存的数据与 cacheMap 中的数据进行对比,判断数据是否发生变化
checkLocalConfig(cacheData);
// 如果数据发生发辫,通知监听器
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception e) {
log.error("NACOS-CLIENT", "get local config info error", e);
}
}
}
// ··· 省略部分代码 ···
} catch (Throwable e) {
log.error("500", "longPulling error", e);
} finally {
executorService.execute(this);
}
}
}
复制代码
在上一个 checkConfigInfo
的循环中,传递了一个 taskId 给 LongPullingRunnable,这里主要是用于在启动了多个 LongPullingRunnable 时可以区分执行的任务批次。
这里判断数据是否发生变化主要在 checkLocalConfig 方法。
checkLocalConfig
private void checkLocalConfig(CacheData cacheData) {
final String dataId = cacheData.dataId;
final String group = cacheData.group;
final String tenant = cacheData.tenant;
File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);
// 没有 -> 有
if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
String md5 = MD5.getInstance().getMD5String(content);
cacheData.setUseLocalConfigInfo(true);
cacheData.setLocalConfigInfoVersion(path.lastModified());
cacheData.setContent(content);
log.warn(agent.getName(),
"[failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}",
dataId, group, tenant, md5, ContentUtils.truncateContent(content));
return;
}
// 有 -> 没有。不通知业务监听器,从server拿到配置后通知。
if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
cacheData.setUseLocalConfigInfo(false);
log.warn(agent.getName(), "[failover-change] failover file deleted. dataId={}, group={}, tenant={}", dataId,
group, tenant);
return;
}
// 有变更
if (cacheData.isUseLocalConfigInfo() && path.exists()
&& cacheData.getLocalConfigInfoVersion() != path.lastModified()) {
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
String md5 = MD5.getInstance().getMD5String(content);
cacheData.setUseLocalConfigInfo(true);
cacheData.setLocalConfigInfoVersion(path.lastModified());
cacheData.setContent(content);
log.warn(agent.getName(),
"[failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}",
dataId, group, tenant, md5, ContentUtils.truncateContent(content));
return;
}
}
复制代码
检查本地配置,主要有三种情况:
- 如果 isUseLocalConfigInfo 为 false,但是本地文件是存在的,那么把 isUseLocalConfigInfo设置为 true,并且更新 cacheData 的内容以及内容的版本号(文件最后更新时间)
- 如果 isUseLocalConfigInfo 为 true,但是本地文件不存在,则设置为 false,不通知监听器。
- 如果 isUseLocalConfigInfo 为 true, 本地缓存文件也存在。但是 cacheDate 内容的版本时间和文件的更新时间补一致。则说明配置内容有变更,更新 cacheData 中的内容,并且设置 isUseLocalConfigInfo 为 true。
checkListenerMd5
回到 com.alibaba.nacos.client.config.impl.ClientWorker.LongPullingRunnable#run
方法中:
try {
// 通过本地文件中缓存的数据与 cacheMap 中的数据进行对比,判断数据是否发生变化
checkLocalConfig(cacheData);
// 如果数据发生发辫,通知监听器
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception e) {
log.error("NACOS-CLIENT", "get local config info error", e);
}
复制代码
这里 checkLocalConfig 检查到数据有变化后,则会执行 checkListenerMd5 ;
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
if (!md5.equals(wrap.lastCallMd5)) {
safeNotifyListener(dataId, group, content, md5, wrap);
}
}
}
复制代码
遍历所有的监听器,如果发现数据的 md5 值不同,则发送通知。
safeNotifyListener
private void safeNotifyListener(final String dataId, final String group, final String content,
final String md5, final ManagerListenerWrap listenerWrap) {
final Listener listener = listenerWrap.listener;
Runnable job = new Runnable() {
public void run() {
// ··· 省略部分代码 ···
// 发送配置变更的通知
listener.receiveConfigInfo(contentTmp);
listenerWrap.lastCallMd5 = md5;
}
}
复制代码
receiveConfigInfo
这里相应的实现类为: NacosContextRefresher
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
// many Spring context
if (this.ready.compareAndSet(false, true)) {
this.registerNacosListenersForApplications();
}
}
private void registerNacosListenersForApplications() {
// ··· 省略部分代码 ···
registerNacosListener(nacosPropertySource.getGroup(), dataId);
}
private void registerNacosListener(final String group, final String dataId) {
Listener listener = listenerMap.computeIfAbsent(dataId, i -> new Listener() {
@Override
public void receiveConfigInfo(String configInfo) {
loadCount.incrementAndGet();
String md5 = "";
if (!StringUtils.isEmpty(configInfo)) {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
md5 = new BigInteger(1, md.digest(configInfo.getBytes("UTF-8")))
.toString(16);
}
catch (NoSuchAlgorithmException | UnsupportedEncodingException e) {
LOGGER.warn("[Nacos] unable to get md5 for dataId: " + dataId, e);
}
}
refreshHistory.add(dataId, md5);
// 发送 RefreshEvent 通知。
applicationContext.publishEvent(
new RefreshEvent(this, null, "Refresh Nacos config"));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Refresh Nacos config group{},dataId{}", group, dataId);
}
}
@Override
public Executor getExecutor() {
return null;
}
});
try {
configService.addListener(dataId, group, listener);
}
catch (NacosException e) {
e.printStackTrace();
}
}
复制代码
这里可以看到在 onApplicationEvent 方法(spring 容器准备好以后的回调) 中,注册了相应的 registerNacosListener
监听器。在配置变更时,使用 RefreshEvent 通知客户端(@RefreshScope 注解生效)。
检查服务端配置
回到 com.alibaba.nacos.client.config.impl.ClientWorker.LongPullingRunnable#run
中,先通过本地配置的读取和检查来判断数据是否发生变化,如果发生变化,则发送通知(cacheData.checkListenerMd5()
)。
接着,当前线程还需要去远程服务器上获取最新的数据,检查哪些数据发生了变化:
List<String> inInitializingCacheList = new ArrayList<String>();
// check server config
// 从服务端获取变化了的 dataId 列表
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
// 遍历有变化的 groupKey,请求远端获取指定的 groupKey 内容
String content = getServerConfig(dataId, group, tenant, 3000L);
// 获取的内容设置到 cacheDate 中
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(content);
log.info(agent.getName(), "[data-received] dataId={}, group={}, tenant={}, md5={}, content={}",
dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(content));
} catch (NacosException ioe) {
log.error(agent.getName(), "NACOS-XXXX",
"[get-update] get changed config exception. dataId={}, group={}, tenant={}, msg={}",
dataId, group, tenant, ioe.toString());
}
}
// 再遍历所有的 cacheData ,找到变化的数据进行通知。
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
复制代码
这里主要做了几个操作:
- 通过 checkUpdateDataIds 获取远端有数据变更的 dataId 列表。
- 遍历这些变化了的 dataId ,调用 getServerConfig 获取对应的内容。
- 获取到数据后,更新本地的 cacheDate。
- 最后再遍历 cacheData,找到变化的数据进行通知。
这里的 getServerConfig 以及 checkListenerMd5 前面都有说到,这里就略过了。与上面所提到的基本一致。我们主要来看 checkUpdateDataIds 方法:
checkUpdateDataIds
首先从cacheData 中获取 isUseLocalConfigInfo 为 false 的数据,拼装请求参数后请求 checkUpdateConfigStr
/**
* 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
*/
List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) {
StringBuilder sb = new StringBuilder();
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isUseLocalConfigInfo()) {
sb.append(cacheData.dataId).append(WORD_SEPARATOR);
sb.append(cacheData.group).append(WORD_SEPARATOR);
if (StringUtils.isBlank(cacheData.tenant)) {
sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
} else {
sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
}
if (cacheData.isInitializing()) {
// cacheData 首次出现在cacheMap中&首次check更新
inInitializingCacheList
.add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
}
}
}
boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}
复制代码
checkUpdateConfigStr
/**
* 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
*/
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) {
List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
// 超时时间
long timeout = TimeUnit.SECONDS.toMillis(30L);
// 设置请求头
List<String> headers = new ArrayList<String>(2);
headers.add("Long-Pulling-Timeout");
headers.add("" + timeout);
// told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) {
headers.add("Long-Pulling-Timeout-No-Hangup");
headers.add("true");
}
if (StringUtils.isBlank(probeUpdateString)) {
return Collections.emptyList();
}
try {
// 发送 http 请求,请求 /listener 接口
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
agent.getEncode(), timeout);
if (HttpURLConnection.HTTP_OK == result.code) {
setHealthServer(true);
// 如果服务端返回正常,则从返回的请求中解析 dataId 并返回。
return parseUpdateDataIdResponse(result.content);
} else {
setHealthServer(false);
if (result.code == HttpURLConnection.HTTP_INTERNAL_ERROR) {
log.error("NACOS-0007", LoggerHelper.getErrorCodeStr("Nacos", "Nacos-0007", "环境问题",
"[check-update] get changed dataId error"));
}
log.error(agent.getName(), "NACOS-XXXX", "[check-update] get changed dataId error, code={}",
result.code);
}
} catch (IOException e) {
setHealthServer(false);
log.error(agent.getName(), "NACOS-XXXX", "[check-update] get changed dataId exception, msg={}",
e.toString());
}
return Collections.emptyList();
}
复制代码
这里我们看到,客户端请求服务端时,设置了一个 30s 的超时时间。请求了 /listener 接口,并且服务端的返回只有数据变化的 dataId ,不涉及任何配置内容。
接下来我们继续看服务端在收到这个请求的时候做了什么
服务端
ConfigController#listener
在 com.alibaba.nacos.config.server.controller.ConfigController#listener
中可以看到,最终是调用的 inner.doPollingConfig 来处理。
/**
* The client listens for configuration changes.
*/
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
String probeModify = request.getParameter("Listening-Configs");
if (StringUtils.isBlank(probeModify)) {
LOGGER.warn("invalid probeModify is blank");
throw new IllegalArgumentException("invalid probeModify");
}
probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
Map<String, String> clientMd5Map;
try {
clientMd5Map = MD5Util.getClientMd5Map(probeModify);
} catch (Throwable e) {
throw new IllegalArgumentException("invalid probeModify");
}
// do long-polling
inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
复制代码
inner.doPollingConfig
/**
* 轮询接口.
*/
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
// Long polling.
// 如果支持长轮询,走这个溶剂
if (LongPollingService.isSupportLongPolling(request)) {
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + "";
}
// Compatible with short polling logic.
// 兼容短轮询
List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
// Compatible with short polling result.
String oldResult = MD5Util.compareMd5OldResult(changedGroups);
String newResult = MD5Util.compareMd5ResultString(changedGroups);
String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
if (version == null) {
version = "2.0.0";
}
int versionNum = Protocol.getVersionNumber(version);
// Before 2.0.4 version, return value is put into header.
if (versionNum < START_LONG_POLLING_VERSION_NUM) {
response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
} else {
request.setAttribute("content", newResult);
}
// Disable cache.
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
return HttpServletResponse.SC_OK + "";
}
复制代码
这里的短轮询其实就是一个正常的 http 请求。请求过来后对比 md5,不管有无配置更新都直接返回给客户端。
我们主要看看这个长轮询
longPollingService.addLongPollingClient
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) {
// 获取请求头中的超时时间
String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
String tag = req.getHeader("Vipserver-Tag");
// 缓冲时间
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
// Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
// 获取客户端传递过来的超时时间并减掉缓冲时间(提前 500 毫秒,避免客户端超时),作为最终的超时时间
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
if (isFixedPolling()) {
timeout = Math.max(10000, getFixedPollingInterval());
// Do nothing but set fix polling timeout.
} else {
long start = System.currentTimeMillis();
// 根据客户端请求过来的 md5 和服务端对应 groupKey 做对比。如果不一致则通过 generateResponse 直接将结果返回。
List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
if (changedGroups.size() > 0) {
generateResponse(req, rsp, changedGroups);
LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
}
}
String ip = RequestUtil.getRemoteIp(req);
// Must be called by http thread, or send response.
// AsyncContext 是 Servlet3.0 中提供的对象,调用 startAsync 后,这个请求的响应会被延后,并释放容器分配的线程。
// 以此来实现长轮询的机制。
final AsyncContext asyncContext = req.startAsync();
// AsyncContext.setTimeout() is incorrect, Control by oneself
asyncContext.setTimeout(0L);
ConfigExecutor.executeLongPolling(
new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
复制代码
这个方法主要做了如下几个事情:
- 获取客户端请求参数的超时时间,进行本地计算。比如这里我们传递的超时时间为 30S,然后再减掉 500ms 的响应时间,最终超时时间为 29.5S。
- 在收到请求时,就根据客户端请求的 md5 和服务器对应groupKey 下对应内容的 md5 进行比较,如果有配置变更,则不在意超时时间,直接将结果返回。
- 如果当前请求时没有配置变化,则封装
ClientLongPolling
任务交给ConfigExecutor.executeLongPolling
去处理。
ClientLongPolling#run()
@Override
public void run() {
asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
@Override
public void run() {
try {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
// Delete subscriber's relations.
boolean removeFlag = allSubs.remove(ClientLongPolling.this);
if (removeFlag) {
if (isFixedPolling()) {
LogUtil.CLIENT_LOG
.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix",
RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
"polling", clientMd5Map.size(), probeRequestSize);
List<String> changedGroups = MD5Util
.compareMd5((HttpServletRequest) asyncContext.getRequest(),
(HttpServletResponse) asyncContext.getResponse(), clientMd5Map);
// 客户端返回
if (changedGroups.size() > 0) {
sendResponse(changedGroups);
} else {
sendResponse(null);
}
} else {
LogUtil.CLIENT_LOG
.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout",
RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
"polling", clientMd5Map.size(), probeRequestSize);
sendResponse(null);
}
} else {
LogUtil.DEFAULT_LOG.warn("client subsciber's relations delete fail.");
}
} catch (Throwable t) {
LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());
}
}
}, timeoutTime, TimeUnit.MILLISECONDS);
allSubs.add(this);
}
复制代码
在 ClientLongPolling
的 run 方法中做了什么呢。看到这里启动了一个延迟任务,在 29.5S 后,再次检查对应数据的内容是否发生了变化,不管有无变化都正常响应客户端。
在这里我们发现了一些事情。如果在没有配置变化的情况下,客户端的 checkUpdateConfigStr
请求。为了从服务端获取有配置变化的 dataId 列表,居然被服务端卡主了 29.5S?29.5 后再给客户端正常的响应,文章的最上方有简单的介绍监听机制的 pull 机制,其实这里 nacos 使用的也是 pull 的机制(由客户端主动请求服务端获取),这种机制的缺点之一是定时请求服务端造成的资源消耗。但是在 nacos 这种长轮询下,好像这个点不是什么问题!
但是这里好像又有一个问题,如果所有的请求都需要 29.5s 才能有正常的响应,那就是我们的配置变更在极端情况下需要 29.5 后才会通知到客户端。这样好像达不到实时的要求。
这里必须要实现的是,当用户在 nacos 的控制台修改了配置以后,可以直接返回给客户端。
allSubs#add
allSubs 是一个队列,队列中存放的是 ClientLongPolling
对象:
/**
* ClientLongPolling subscibers.
*/
final Queue<ClientLongPolling> allSubs;
复制代码
LongPollingService
在 LongPollingService
的构造方法中,使用了 NotifyCenter.registerSubscriber
订阅了一个事件,如果这个事件为 LocalDataChangeEvent
,也就是服务端本地的数据发生变化的话,就会执行一个 DataChangeTask
线程。
public LongPollingService() {
allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();
ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);
// Register LocalDataChangeEvent to NotifyCenter.
NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
// Register A Subscriber to subscribe LocalDataChangeEvent.
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
if (isFixedPolling()) {
// Ignore.
} else {
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
}
}
}
@Override
public Class<? extends Event> subscribeType() {
return LocalDataChangeEvent.class;
}
});
}
复制代码
DataChangeTask#run
在 com.alibaba.nacos.config.server.service.LongPollingService.DataChangeTask#run
方法中:
@Override
public void run() {
try {
ConfigCacheService.getContentBetaMd5(groupKey);
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();
if (clientSub.clientMd5Map.containsKey(groupKey)) {
// If published tag is not in the beta list, then it skipped.
if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {
continue;
}
// If published tag is not in the tag list, then it skipped.
if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
continue;
}
getRetainIps().put(clientSub.ip, System.currentTimeMillis());
iter.remove(); // Delete subscribers' relationships.
LogUtil.CLIENT_LOG
.info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance",
RequestUtil
.getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),
"polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
clientSub.sendResponse(Arrays.asList(groupKey));
}
}
} catch (Throwable t) {
LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));
}
}
复制代码
这个方法中从 allSubs 中遍历所有 ClientLongPolling
,通过 clientSub.sendResponse
将数据返回给客户端。所以这里就可以理解为什么数据变化可以实时触发更新了。
服务端这里的逻辑:
- 收到请求后 check 是否有数据变化,如果有数据变化,直接响应客户端。
- 如果没有数据变化,启动一个延迟任务,延迟 29.5S 后相应客户端。
- 当有配置变更时,触发
DataChangeTask
,响应客户端的请求。
总结
今天这里大概说了下 nacos 做为配置中心的实现,主要从以下几个方面:
- 客户端请求服务端获取配置并缓存到本地, 通过
NacosPropertySourceLocator
进行加载。 - 服务端对配置的增删改查操作。
- 客户端配置的动态感知,通过
com.alibaba.nacos.client.config.impl.ClientWorker#checkUpdateConfigStr
方法与服务端建立长轮询来完成配置的动态感知。
其中提到一个长轮询的机制,其实简单的说就是客户端在 pull 服务端的时候,设置一个较长的超时时间(30S),服务端在收到客户端的请求时,如果无配置变更不马上响应,而是将该次请求挂起 29.5s。客户端在没收到请求的响应时,会一直等待,避免在普通短轮询机制下的空轮询,避免资源的白白消耗。
而且在客户端长轮询的这段期间,如果有配置变更,会通过事件通知并直接打破 29.5s 的等待直接返回。这里有点类似 push 机制,在服务端有配置更新时主动通知客户端,不过这里不用长期记录客户端所需的一些信息,这里只有 29.5S, 而且信息都是客户端请求时携带过来,不会增加服务端的复杂程度。