总文档 :文章目录
Github : github.com/black-ant
一 . 前言
水文一篇 , 哈哈哈哈哈哈~~~~~~~~~~~~
文章目的 :
- 梳理 Nacos 创建功能的 Debug 方向
- 梳理 Nacos 组件的模块体系
文章大纲 : 该文档主要涉及以下几个主要的部分
- Nacos 服务发现
- Nacos 配置加载
- Nacos 健康检查
- Nacos 路由策略
PS : 文档参考来源为 官方文档 , 建议阅读文档了解快速使用
二 . 源码的编译
2.1 Nacos源码编译
// Step 1 : 下载源码
https://github.com/alibaba/nacos.git
// Step 2 : 编译 Nacos
mvn -Prelease-nacos -Dmaven.test.skip=true clean install -U
// Step 3 : 运行 Server 文件
nacos_code\distribution\target
复制代码
2.2 Nacos 源码运行
// Step 1 : 下载 Nacos 源码
https://github.com/alibaba/nacos.git
// Step 2 : IDEA 导入 Nacos
此处添加 SpringBoot 启动 , 启动的类为 com.alibaba.nacos.Nacos
// Step 3 : IDEA 修改启动参数
-Dnacos.standalone=true -Dnacos.home=C:\\nacos
-nacos.standalone=true : 单机启动
-Dnacos.home=C:\\nacos : 日志路径
// PS : Nacos Application
@SpringBootApplication(scanBasePackages = "com.alibaba.nacos")
@ServletComponentScan
@EnableScheduling
public class Nacos {
public static void main(String[] args) {
SpringApplication.run(Nacos.class, args);
}
}
复制代码
三 . 模块源码
<modules>
<!-- 配置管理-->
<module>config</module>
<!-- Nacos 内核 -->
<module>core</module>
<!-- 服务发现 -->
<module>naming</module>
<!-- 地址服务器-->
<module>address</module>
<!-- 单元测试 -->
<module>test</module>
<!-- 接口抽象 -->
<module>api</module>
<!-- 客户端 -->
<module>client</module>
<!-- 案例 -->
<module>example</module>
<!-- 公共工具 -->
<module>common</module>
<!-- Server 构建发布 -->
<module>distribution</module>
<!-- 控制台,图形界面模块 -->
<module>console</module>
<!-- 元数据管理-->
<module>cmdb</module>
<!-- TODO : 猜测是集成 istio 完成流量控制-->
<module>istio</module>
<!-- 一致性管理 -->
<module>consistency</module>
<!-- 权限控制 -->
<module>auth</module>
<!-- 系统信息管理 Env 读取 , conf 读取 -->
<module>sys</module>
</modules>
复制代码
3.1 Nacos 服务的发现和管理 (c0-c20)
Nacos 对服务的管理主要集中在 Naming 模块中 , 这里结合 Client 端看一下服务的发现和管理是什么逻辑 , 以及该如何去操作他们
3.1.1 控制台获取服务列表
外部接口 :
- C- CatalogController # listDetail : 获取服务详情列表
- C- CatalogController # instanceList: 列出特殊服务的实例
- C- CatalogController # serviceDetail : 服务详情
核心主要是通过 ServiceManager 进行处理 , 此处看一下内部相关的逻辑 :
通过三个接口不难发现 ,其最终的调用核心都是 ServiceManager 类
// 内部类
C01- ServiceManager
// 内部类
PC- UpdatedServiceProcessor
PC- ServiceUpdater
PSC- ServiceChecksum
PC- EmptyServiceAutoClean
PC- ServiceReporter
PSC- ServiceKey :
// 核心参数
F- Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
F- LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);
F- ConsistencyService consistencyService;
// 常用方法
M- init : 初始化方法
M- chooseServiceMap : 通过空间名获取 Server 集合
M- addUpdatedServiceToQueue : 更新 Server
M- onChange : server 改变
M- onDelete : server 删除
// 另外 , ServiceManager 还存在一个依赖 : ConsistencyService
C02- ConsistencyService : 一致性服务接口 -> PS:C02_01
M- put :向集群提交一个数据
M- remove :从集群删除一个数据
M- get :从集群获取数据
M- listen :监听集群中某个key的变化
M- unlisten :删除对某个key的监听
M- isAvailable :返回当前一致性状态是否可用
//总结 : 此处的逻辑很简单 , 就是对集合的 CURD 操作 , 核心特点有以下几个 :
1- Delete 时 , 会调用依赖对象 ConsistencyService (DelegateConsistencyServiceImpl) , 用于处理一致性需求
复制代码
PS:C02_01 ConsistencyService 体系结构
当存在多个服务的时候 , 是如何存储的?
// 如上述图所示 , 相关的对象存放在 clusterMap 中
// 注意 , Service 有2个
com.alibaba.nacos.api.naming.pojo.Service
com.alibaba.nacos.naming.core.Service
// Service 的属性
C- Service
I- com.alibaba.nacos.api.naming.pojo.Service
F- Selector selector
F- Map<String, Cluster> clusterMap = new HashMap<>();
F- Boolean enabled
F- Boolean resetWeight
F- String token
F- List<String> owners
复制代码
3.1.2 Nacos 服务和 Config 的控制类
Nacos 中主要通过 NamingService 和 ConfigService 对服务和配置进行控制 , 其底层原理仍然为 : , 这里来简单看一下
ConfigService -> NacosConfigService
NamingService -> NacosNamingService
这2个类归属于 com.alibaba.nacos.client.naming 包
// PS : 注意 ,要使用 nacos-config-spring-boot-starter 和 nacos-discovery-spring-boot-starter 包
复制代码
3.1.3 Nacos 的退出销毁
当我们注册的服务在关闭的时候 , Nacos 会在生命周期结束的时候从 Server 端注销该应用
Step 1 : Closeable 停止相关类 , 停止项目 , 当我们点击停止后 , 可以看到如下一串log
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.beat.BeatReactor do shutdown stop
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.EventDispatcher do shutdown begin
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.EventDispatcher do shutdown stop
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.HostReactor do shutdown begin
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.PushReceiver do shutdown begin
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.PushReceiver do shutdown stop
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.backups.FailoverReactor do shutdown begin
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.backups.FailoverReactor do shutdown stop
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.core.HostReactor do shutdown stop
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.net.NamingProxy do shutdown begin
com.alibaba.nacos.client.naming : [NamingHttpClientManager] Start destroying NacosRestTemplate
com.alibaba.nacos.client.naming : [NamingHttpClientManager] Destruction of the end
com.alibaba.nacos.client.naming : com.alibaba.nacos.client.naming.net.NamingProxy do shutdown stop
这里可以看到 , 这里进行了关闭操作 , 其中主要的 Close 操作都是基于 com.alibaba.nacos.common.lifecycle.Closeable 进行实现
// PS : 此处的调用逻辑为 TODO
复制代码
Step 2 : NacosServiceRegistry 注销
除了这里 Closeable 会关闭外 , 还会注销 Service , 此处主要是NacosServiceRegistry
public void deregister(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
return;
}
NamingService namingService = namingService();
String serviceId = registration.getServiceId();
String group = nacosDiscoveryProperties.getGroup();
try {
namingService.deregisterInstance(serviceId, group, registration.getHost(),
registration.getPort(), nacosDiscoveryProperties.getClusterName());
} catch (Exception e) {
// 省略 log
}
}
// PS : 此处的原理为 继承了 ServiceRegistry , 实现销毁逻辑
C- AbstractAutoServiceRegistration
复制代码
Nacos_Closeable 体系
3.1.4 健康检查流程
Nacos 提供对服务的实时的健康检查,阻止向不健康的主机或服务实例发送请求。Nacos 支持传输层 (PING 或 TCP)和应用层 (如 HTTP、MySQL、用户自定义)的健康检查。
健康检查相关接口
- 发送实例心跳 (InstanceController) : /nacos/v1/ns/instance/beat
- 更新实例的健康状态 (HealthController) : /nacos/v1/ns/health/instance
Client 端发起心跳
服务端会定时发起心跳操作调用2个接口 :
C- BeatReactor
PC- BeatTask : 内部类
// 其中会有2个步骤 :
// Step 1 : BeatReactor # addBeatInfo 中添加定时任务
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
//fix #1733
if ((existBeat = dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
dom2Beat.put(key, beatInfo);
// 添加心跳信息
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
// Step 2 : BeatTask 中调用 Server 接口进行心跳操作
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
// PS : 心跳的间隔默认是 5秒 ( com.alibaba.nacos.api.common.Constants)
public static final long DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
public static final long DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
public static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
复制代码
服务端检测心跳
// 服务端同样会对实例进行检测 , 核心类为 ClientBeatCheckTask
// Step 1 : ClientBeatCheckTask 的创建
C- Service
?- 在 Service 初始化时 ,即开始了 Task 任务
public void init() {
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
//............
}
// 这里也可以看到默认时间的设置
public static void scheduleCheck(ClientBeatCheckTask task) {
futureMap.computeIfAbsent(task.taskKey(),
k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}
// Step 2 : ClientBeatCheckTask 的运行
C- ClientBeatCheckTask
?- 检查并更新临时实例的状态,如果它们已经过期则删除它们。
public void run() {
//...... 省略
// Step 1 : 获取所有实例
List<Instance> instances = service.allIPs(true);
for (Instance instance : instances) {
// 如果时间大于心跳超时时间 , 则修改健康状态
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
// 如果心跳大于删除时间 , 则删除实例
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
deleteIp(instance);
}
}
}
// PS : 第一次修改的是健康状态 , 后面才修改的实例数
复制代码
ACK 健康检查主要逻辑 ,同时推送信息
无意中发现了一个 ACK 机制 , 是通过事件出发的 . 这个模式看代码主要是为了在 Server 发生变化的时候 ,通过 udpClient , 以 UDP 端口推送更新
PS : 客户端在查询服务实例的时候,如果提供 udp 端口,则 server 会创建 udpClient
for (Instance instance : service.allIPs(Lists.newArrayList(clusterName))) {
if (instance.getIp().equals(ip) && instance.getPort() == port) {
instance.setHealthy(valid);
// 发布事件 ServiceChangeEvent
pushService.serviceChanged(service);
break;
}
}
// ServiceChangeEvent 事件的处理
C- PushService
public void onApplicationEvent(ServiceChangeEvent event) {
Service service = event.getService();
String serviceName = service.getName();
String namespaceId = service.getNamespaceId();
Future future = GlobalExecutor.scheduleUdpSender(() -> {
try {
// 通过 ServerName 和 命名空间 获取PushClient 集合
ConcurrentMap<String, PushClient> clients = clientMap
.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {
return;
}
Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();
// 循环所有的 PushClient
for (PushClient client : clients.values()) {
if (client.zombie()) {
clients.remove(client.toString());
continue;
}
Receiver.AckEntry ackEntry;
// 获取缓存 key ,并且从缓存中获取实体数据
String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
byte[] compressData = null;
Map<String, Object> data = null;
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
compressData = (byte[]) (pair.getValue0());
data = (Map<String, Object>) pair.getValue1();
}
// 构建 ACK 实体类
if (compressData != null) {
ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
} else {
ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
if (ackEntry != null) {
cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
}
}
// UDP ACK 校验 , 同时推送 ACK 实体
udpPush(ackEntry);
}
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
} finally {
futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
}
}, 1000, TimeUnit.MILLISECONDS);
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}
private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
if (ackEntry == null) {
return null;
}
if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return ackEntry;
}
try {
if (!ackMap.containsKey(ackEntry.key)) {
totalPush++;
}
ackMap.put(ackEntry.key, ackEntry);
udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());
// Socket Send
udpSocket.send(ackEntry.origin);
ackEntry.increaseRetryTime();
GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
return ackEntry;
} catch (Exception e) {
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return null;
}
}
复制代码
健康检查阈值的使用
在配置服务时 , 可以配置一个 0-1 的浮点数 , 定义健康检查的阈值 ,该阈值对应的类为 com.alibaba.nacos.api.naming.pojo.Service
C- Service
F- name : 服务名
F- protectThreshold : 健康阈值
F- appName : 应用名
F- groupName : 组名
F- metadata : 元数据
// 阈值的使用
C- InstanceController
M- doSrvIpxt :
// 核心逻辑
double threshold = service.getProtectThreshold();
// IPMap 中可用的健康实例数/服务总数的比例 如果小于阈值 , 则达到保护阈值
if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
if (isCheck) {
result.put("reachProtectThreshold", true);
}
ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
ipMap.get(Boolean.FALSE).clear();
}
PS : 这里联想后面 , Client 做 Balancer 时 , 获取的 Server 实际上就是全部健康的实例了
复制代码
但是阈值的目的是什么呢 ?
假设实例出现了大量的异常 , 那么就会导致最后压力会到那几个健康的实例上 , 这个时候 , 可能会出现连锁反应
为了避免这些情况 ,当达到健康阈值的时候 , 就将所有的实例返回.
这就是那句**ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));**的目的.
虽然Client 端可能碰到异常实例 ,但是可以避免整个系统崩溃
/nacos/v1/ns/health/instance 参数
名称 | 类型 | 是否必选 | 描述 |
---|---|---|---|
namespaceId | 字符串 | 否 | 命名空间ID |
serviceName | 字符串 | 是 | 服务名 |
groupName | 字符串 | 否 | 分组名 |
clusterName | 字符串 | 否 | 集群名 |
ip | 字符串 | 是 | 服务实例IP |
port | int | 是 | 服务实例port |
healthy | boolean | 是 | 是否健康 |
权重的处理
权重主要是 Instance 对象中进行配置
C- Instance
M- instanceId
M- ip
M- port
M- weight : 权重
M- healthy : 健康情况
M- enabled
M- ephemeral
M- clusterName
M- serviceName
M- metadata
复制代码
PS : 权重可以用于 Client 端时进行 权重分配处理
参考原文 @ blog.csdn.net/krisdad/art…
public class NacosWeightLoadBalanceRule extends AbstractLoadBalancerRule {
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {}
@Resource private NacosDiscoveryProperties nacosDiscoveryProperties;
@Override
public Server choose(Object key) {
// 1.获取服务的名称
BaseLoadBalancer loadBalancer = (BaseLoadBalancer) this.getLoadBalancer();
String serverName = loadBalancer.getName();
// 2.此时Nacos Client会自动实现基于权重的负载均衡算法
NamingService namingService = nacosDiscoveryProperties.namingServiceInstance();
try {
Instance instance = namingService.selectOneHealthyInstance(serverName);
return new NacosServer(instance);
} catch (NacosException e) {
e.printStackTrace();
}
return null;
}
@Bean
public IRule getLoadBalancerRule(){
return new NacosWeightLoadBalancerRule();
}
// PS : 个人以为 , 权重是给 Client 端自行处理的
复制代码
其他要点
// Nacos 的默认值
@Value("${nacos.naming.empty-service.auto-clean:false}")
private boolean emptyServiceAutoClean;
@Value("${nacos.naming.empty-service.clean.initial-delay-ms:60000}")
private int cleanEmptyServiceDelay;
@Value("${nacos.naming.empty-service.clean.period-time-ms:20000}")
private int cleanEmptyServicePeriod;
复制代码
3.2 Nacos 配置流程 C30-C60
3.2.1 Nacos 配置的管理
// 获取配置 , 这里主要有几个步骤 :
C30- NacosConfigService
M30_01- getConfigInner(String tenant, String dataId, String group, long timeoutMs)
- 构建 ConfigResponse , 为其设置 dataId , tenant , group
1- 调用 LocalConfigInfoProcessor.getFailover 优先使用本地配置
2- 调用 ClientWorker , 获取远程配置 -> PS:M30_01_01
3- 仍然没有 , LocalConfigInfoProcesso.getSnapshot 获取快照
End- configFilterChainManager 进行 Filter 链处理
// PS:M30_01_01 ClientWorker 的处理
ClientWorker 中进行了远程服务的请求 , 核心代码 :
agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
// 可以看到 , 这里并没有太负载的逻辑 , 仍然是 Rest 请求 : PS 看官网的消息 , 2.0 会采用长连接 , 这里应该会有变动
- Constants.CONFIG_CONTROLLER_PATH : /v1/cs/configs
复制代码
LocalConfigInfoProcessor 主要逻辑
// 这里本地配置是指本地 File 文件 , 这里通过源码推断一下使用的方式 :
C31- LocalConfigInfoProcessor
M31_01- getFailover
- 获取 localPath -> PS:M31_01_01
M32_02- saveSnapshot : 获取成功后 , 会保存快照
?- 保存路径 : 省略\nacos\config\fixed-127.0.0.1_8848_nacos\snapshot\one1\test1
// PS:M31_01_01 localPath 参数
C:\Users\10169\nacos\config\fixed-127.0.0.1_8848_nacos\data\config-data\one1\test1
复制代码
Pro 1 : 从这个源码里面 , 可以看到哪些知识点 ?
既然存在本地文件 , 是否意味着我可以通过修改这个路径优先使用本地配置 , 此处修改以下路径后测试成功
省略\nacos\config\fixed-127.0.0.1_8848_nacos\data\config-data\one1\test1
PS : 除了这个路径 , SpringBoot 运行在配置文件中直接配置路径 ->
spring:
cloud:
config:
# 相同配置,本地优先
override-none: true
复制代码
Pro 2 : Filter 的使用
上面可以看到 , 配置的处理中 , 都有个默认的 Filter 处理
configFilterChainManager.doFilter(null, cr);
// 依照这个逻辑 , 是可以进行更多配置的
C32- ConfigFilterChainManager
?- 其中允许自定义添加 Filter
M32_01- addFilter
M32_02- doFilter
复制代码
TODO : 此处如何植入 Filter 待完善 , 没找到接口添加Filter , 奇怪….
3.2.2 Nacos 配置的容灾处理
Nacos LocalConfigInfoProcessor 提供了容灾的功能 , 方式包含2种 : 本地配置和快照处理
本地配置
上面说了 , 修改指定路径即可实现
配置快照
Nacos 的客户端 SDK 会在本地生成配置的快照。当客户端无法连接到 Nacos Server 时,可以使用配置快照显示系统的整体容灾能力。配置快照类似于 Git 中的本地 commit,也类似于缓存,会在适当的时机更新,但是并没有缓存过期(expiration)的概念。
3.2.3 Nacos 动态配置处理
动态配置主要是指配置变更时的监听 :
Nacos 通过长轮询检测配置是否变化 , 对应的核心类为 LongPollingRunnable # checkUpdateDataIds , 对这里 Debug 看下 >>>>
class LongPollingRunnable implements Runnable {
private final int taskId;
public LongPollingRunnable(int taskId) {
this.taskId = taskId;
}
@Override
public void run() {
// .... 核心语句
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
}
}
List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws Exception {
StringBuilder sb = new StringBuilder();
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isUseLocalConfigInfo()) {
sb.append(cacheData.dataId).append(WORD_SEPARATOR);
sb.append(cacheData.group).append(WORD_SEPARATOR);
if (StringUtils.isBlank(cacheData.tenant)) {
sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
} else {
sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
}
if (cacheData.isInitializing()) {
// It updates when cacheData occours in cacheMap by first time.
inInitializingCacheList
.add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
}
}
}
boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {
Map<String, String> params = new HashMap<String, String>(2);
params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
Map<String, String> headers = new HashMap<String, String>(2);
// 长轮询方式
headers.put("Long-Pulling-Timeout", "" + timeout);
// told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) {
headers.put("Long-Pulling-Timeout-No-Hangup", "true");
}
if (StringUtils.isBlank(probeUpdateString)) {
return Collections.emptyList();
}
try {
// In order to prevent the server from handling the delay of the client's long task,
// increase the client's read timeout to avoid this problem.
long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
// /v1/cs/configs/listener
HttpRestResult<String> result = agent
.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(),
readTimeoutMs);
if (result.ok()) {
setHealthServer(true);
return parseUpdateDataIdResponse(result.getData());
} else {
setHealthServer(false);
}
} catch (Exception e) {
setHealthServer(false);
throw e;
}
return Collections.emptyList();
}
// 对应的 Controller 为 ConfigController
public void listener(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
// .............
Map<String, String> clientMd5Map;
try {
clientMd5Map = MD5Util.getClientMd5Map(probeModify);
} catch (Throwable e) {
throw new IllegalArgumentException("invalid probeModify");
}
// do long-polling
inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
// 对应轮询的接口
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
// Long polling.
if (LongPollingService.isSupportLongPolling(request)) {
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + "";
}
// Compatible with short polling logic.
List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
// Compatible with short polling result.
String oldResult = MD5Util.compareMd5OldResult(changedGroups);
String newResult = MD5Util.compareMd5ResultString(changedGroups);
String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
if (version == null) {
version = "2.0.0";
}
int versionNum = Protocol.getVersionNumber(version);
// Before 2.0.4 version, return value is put into header.
if (versionNum < START_LONG_POLLING_VERSION_NUM) {
response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
} else {
request.setAttribute("content", newResult);
}
Loggers.AUTH.info("new content:" + newResult);
// Disable cache.
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
return HttpServletResponse.SC_OK + "";
}
// LongPollingService
这里想看详情推荐看这一篇 https://www.jianshu.com/p/acb9b1093a54
复制代码
3.2.4 Nacos 元数据
这里来看一下 , Nacos 的元数据是什么 ?
Nacos数据(如配置和服务)描述信息,如服务版本、权重、容灾策略、负载均衡策略、鉴权配置、各种自定义标签 (label),从作用范围来看,分为服务级别的元信息、集群的元信息及实例的元信息。
// 方式一 : 配置服务的时候配置
spring:
application:
name: nacos-config-server
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
metadata:
version: v1
// 方式二 : 页面直接配置
// 方式三 : API 调用 , 通过 Delete , Update 等请求方式决定类型
批量更新实例元数据 (InstanceController) : /nacos/v1/ns/instance/metadata/batch
批量删除实例元数据 (InstanceController) : /nacos/v1/ns/instance/metadata/batch
复制代码
3.3 Nacos 负载均衡
Nacos 的负载均衡归属于 动态 DNS 服务
动态 DNS 服务支持权重路由,让您更容易地实现中间层负载均衡、更灵活的路由策略、流量控制以及数据中心内网的简单DNS解析服务。动态DNS服务还能让您更容易地实现以 DNS 协议为基础的服务发现,以帮助您消除耦合到厂商私有服务发现 API 上的风险。
基于 Feign 相关的知识 , 我们知道 , Balance 的处理是在 BaseLoadBalancer 中结合 Rule 处理的 , 这里我们来分析一下 , 2 者是通过什么结构进行连接处理的
Step 1 : Feign 对 Nacos 的调用
我们以 BaseLoadBalancer 为起点 , 进行 Debug 处理 , 来到点位 PredicateBasedRule
C- PredicateBasedRule
M- choose(Object key)
- Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
?- 此处可以看到 , 其中有一个 lb.getAllServers 的操作 , 此处的 lb 为 DynamicServerListLoadBalancer
// PS : getAllServers , 此处可以看到 ,其中的 Servers 已经全部放在 List 中了
public List<Server> getAllServers() {
return Collections.unmodifiableList(allServerList);
}
// 跟踪一下放入的逻辑 , 放入逻辑的起点是ILoadBalancer Bean 的加载 , 其调用链为 :
C- RibbonClientConfiguration # ribbonLoadBalancer : 构建一个 ILoadBalancer
C- ZoneAwareLoadBalancer : 进入 ZoneAwareLoadBalancer 构造函数
C- DynamicServerListLoadBalancer : 进入 构造函数
C- DynamicServerListLoadBalancer # restOfInit : init 操作
C- DynamicServerListLoadBalancer # updateListOfServers : 更新 Server 列表主流程 , 此处第一次获取相关的 Server List , 后续Debug 第一节点
C- DynamicServerListLoadBalancer # updateAllServerList : 设置 ServerList
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
}
}
updateAllServerList(servers);
}
可以看到 , 其中有2个获取 Server 的逻辑方法 , 在这里看一下家族体系 , 就清楚了
serverListImpl.getUpdatedListOfServers();
filter.getFilteredListOfServers(servers);
// 下述图片中就很清楚了 , 存在一个实现类 NacosServerList 实现类 , 从Nacos 中获取服务列表
private List<NacosServer> getServers() {
try {
String group = discoveryProperties.getGroup();
List<Instance> instances = discoveryProperties.namingServiceInstance()
.selectInstances(serviceId, group, true);
return instancesToServerList(instances);
} catch (Exception e) {
throw new IllegalStateException(....);
}
}
// 负载均衡策略
负载均衡策略是基于 Balance
复制代码
3.4 集群的处理
Nacos 集群使用
Nacos 的集群使用比较简单 , 只需要在 /conf/cluster.conf 中配置对应的服务信息即可>>>>
#it is ip
#example
127.0.0.1:8848
127.0.0.1:8849
127.0.0.1:8850
复制代码
Nacos 集群源码跟踪
来看一下源码层面 , 这个逻辑是怎么处理的 ?
核心处理类在 com.alibaba.nacos.core.cluster
// Step 1 : 获取配置的方式
C- EnvUtil
public static String getClusterConfFilePath() {
return Paths.get(getNacosHome(), "conf", "cluster.conf").toString();
}
// 读取 Cluster 配置
public static List<String> readClusterConf() throws IOException {
try (Reader reader = new InputStreamReader(new FileInputStream(new File(getClusterConfFilePath())),
StandardCharsets.UTF_8)) {
return analyzeClusterConf(reader);
} catch (FileNotFoundException ignore) {
List<String> tmp = new ArrayList<>();
String clusters = EnvUtil.getMemberList();
if (StringUtils.isNotBlank(clusters)) {
String[] details = clusters.split(",");
for (String item : details) {
tmp.add(item.trim());
}
}
return tmp;
}
}
// Step 2 : Cluster 的使用
AbstractMemberLookup
// 主要使用集中在 ServerManager 中
C- ServerManager
F- ServerMemberManager memberManager;
C- ServerMemberManager : Nacos中的集群节点管理
M- init : 集群节点管理器初始化
M- getSelf : 获取本地节点信息
M- getmemberaddressinfo : 获取正常成员节点的地址信息
M- allMembers : 获取集群成员节点的列表
M- update : 更新目标节点信息
M- isUnHealth : 目标节点是否健康
M- initAndStartLookup : 初始化寻址模式
// TODO : 其他方法就省略了 , 后期准备进行相关的性能分析 , 集群的源码梳理预计放在那一部分分析
复制代码
总结
这篇文章又是一篇更偏向于应用的文章 , 源码深入的较少 , 更重要的原因是因为 Nacos 的源码分层更清楚 , 结构清晰 , 不需要太负载的深入.
另外 , Nacos 2.0 也在发布中 , 看文档采用了 Socket 长连接的方式 , 后续如果有机会 , 对比一下2者的区别看看.
附录
# 附录一 : 手动调用 Server
package com.alibaba.nacos.discovery.service;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Cluster;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.Service;
import com.alibaba.nacos.api.naming.pojo.healthcheck.AbstractHealthChecker;
import netscape.javascript.JSObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* @Classname NacosClientService
* @Description TODO
* @Date 2021/5/26
* @Created by zengzg
*/
@Component
public class NacosClientNodesService implements ApplicationRunner {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private NamingService namingService;
@Value("${spring.cloud.nacos.config.server-addr}")
private String serverAddr;
@Override
public void run(ApplicationArguments args) throws Exception {
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
namingService = NacosFactory.createNamingService(properties);
}
/**
* 获取 Nacos Config
* 参数格式
* {
* "instanceId": "192.168.0.97#9083#DEFAULT#DEFAULT_GROUP@@nacos-user-server",
* "ip": "192.168.0.97",
* "port": 9083,
* "weight": 1, 可以通过权重决定使用的 Server
* "healthy": true,
* "enabled": true,
* "ephemeral": true,
* "clusterName": "DEFAULT",
* "serviceName": "DEFAULT_GROUP@@nacos-user-server",
* "metadata": {
* "preserved.register.source": "SPRING_CLOUD"
* },
* "ipDeleteTimeout": 30000,
* "instanceHeartBeatInterval": 5000,
* "instanceHeartBeatTimeOut": 15000
* }
*
* @param serviceName
* @return
*/
public List<Instance> get(String serviceName) {
List<Instance> content = new LinkedList<Instance>();
try {
content = namingService.getAllInstances(serviceName);
logger.info("------> 获取 Config serviceName [{}] <-------", serviceName);
} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
e.printStackTrace();
}
return content;
}
/**
* 创建 Nacos Config
*
* @param serviceName
* @param ip
* @param port
*/
public void createOrUpdate(String serviceName, String ip, Integer port) {
try {
logger.info("------> 创建 Config GroupID [{}] -- DataID [{}] Success ,The value :[{}] <-------", serviceName, ip, port);
namingService.registerInstance(serviceName, ip, port, "TEST1");
} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
e.printStackTrace();
}
}
/**
* 移除 Nacos Config
*
* @param serviceName
* @param ip
*/
public void delete(String serviceName, String ip, Integer port) {
try {
namingService.deregisterInstance(serviceName, ip, port, "DEFAULT");
logger.info("------> 删除 Config GroupID [{}] -- DataID [{}] Success <-------", serviceName, ip);
} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
e.printStackTrace();
}
}
}
复制代码
# 附录二 : 手动调用 Config
public class NacosClientConfigService implements ApplicationRunner {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private ConfigService configService;
@Value("${spring.cloud.nacos.config.server-addr}")
private String serverAddr;
@Override
public void run(ApplicationArguments args) throws Exception {
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
configService = NacosFactory.createConfigService(properties);
}
/**
* 获取 Nacos Config
*
* @param dataId
* @param groupId
* @return
*/
public String get(String dataId, String groupId) {
String content = "";
try {
content = configService.getConfig(dataId, groupId, 5000);
logger.info("------> 获取 Config GroupID [{}] -- DataID [{}] Success ,The value :[{}] <-------", dataId, groupId, content);
configService.addListener(dataId, groupId, new ConfigListener());
} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
e.printStackTrace();
}
return content;
}
/**
* 创建 Nacos Config
*
* @param dataId
* @param groupId
* @param content
*/
public void createOrUpdate(String dataId, String groupId, String content) {
try {
logger.info("------> 创建 Config GroupID [{}] -- DataID [{}] Success ,The value :[{}] <-------", dataId, groupId, content);
configService.publishConfig(dataId, groupId, content);
} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
e.printStackTrace();
}
}
/**
* 移除 Nacos Config
*
* @param dataId
* @param groupId
*/
public void delete(String dataId, String groupId) {
try {
configService.removeConfig(dataId, groupId);
logger.info("------> 删除 Config GroupID [{}] -- DataID [{}] Success <-------", dataId, groupId);
configService.removeListener(dataId, groupId, null);
} catch (NacosException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
e.printStackTrace();
}
}
}
复制代码
# 附录三 : 使用NacosInjected
可以通过 的
<!-- 使用 Nacos Inject -->
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-config-spring-boot-starter</artifactId>
<version>0.2.7</version>
</dependency>
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-discovery-spring-boot-starter</artifactId>
<version>0.2.7</version>
</dependency>
@NacosInjected
private ConfigService configService;
@NacosInjected
private NamingService namingService;
复制代码
# 附录四 : Naocs 官方架构图 (搬运)
这里是纯搬运 , 可以看官方文档 @ nacos.io/zh-cn/docs/…
功能图 :
- 服务管理:实现服务CRUD,域名CRUD,服务健康状态检查,服务权重管理等功能
- 配置管理:实现配置管CRUD,版本管理,灰度管理,监听管理,推送轨迹,聚合数据等功能
- 元数据管理:提供元数据CURD 和打标能力
- 插件机制:实现三个模块可分可合能力,实现扩展点SPI机制
- 事件机制:实现异步化事件通知,sdk数据变化异步通知等逻辑
- 日志模块:管理日志分类,日志级别,日志可移植性(尤其避免冲突),日志格式,异常码+帮助文档
- 回调机制:sdk通知数据,通过统一的模式回调用户处理。接口和数据结构需要具备可扩展性
- 寻址模式:解决ip,域名,nameserver、广播等多种寻址模式,需要可扩展
- 推送通道:解决server与存储、server间、server与sdk间推送性能问题
- 容量管理:管理每个租户,分组下的容量,防止存储被写爆,影响服务可用性
- 流量管理:按照租户,分组等多个维度对请求频率,长链接个数,报文大小,请求流控进行控制
- 缓存机制:容灾目录,本地缓存,server缓存机制。容灾目录使用需要工具
- 启动模式:按照单机模式,配置模式,服务模式,dns模式,或者all模式,启动不同的程序+UI
- 一致性协议:解决不同数据,不同一致性要求情况下,不同一致性机制
- 存储模块:解决数据持久化、非持久化存储,解决数据分片问题
- Nameserver:解决namespace到clusterid的路由问题,解决用户环境与nacos物理环境映射问题
- CMDB:解决元数据存储,与三方cmdb系统对接问题,解决应用,人,资源关系
- Metrics:暴露标准metrics数据,方便与三方监控系统打通
- Trace:暴露标准trace,方便与SLA系统打通,日志白平化,推送轨迹等能力,并且可以和计量计费系统打通
- 接入管理:相当于阿里云开通服务,分配身份、容量、权限过程
- 用户管理:解决用户管理,登录,sso等问题
- 权限管理:解决身份识别,访问控制,角色管理等问题
- 审计系统:扩展接口方便与不同公司审计系统打通
- 通知系统:核心数据变更,或者操作,方便通过SMS系统打通,通知到对应人数据变更
- OpenAPI:暴露标准Rest风格HTTP接口,简单易用,方便多语言集成
- Console:易用控制台,做服务管理、配置管理等操作
- SDK:多语言sdk
- Agent:dns-f类似模式,或者与mesh等方案集成
- CLI:命令行对产品进行轻量化管理,像git一样好用
领域模型 :
SDK 类图 :