1. 源码编译
1.1 下载源码
从github上选择Tag 1.4.1进行下载。nacos
1.2 编译
用Idea打开项目,并对整个项目进行编译。特别注意要对nacos-consistency 1.4.1这个目录,执行compile操作,否则会报错。
1.3 单机启动
增加VM参数
-Dnacos.standalone=true
复制代码
大功告成。
2. Nacos核心功能点
- 服务注册:Nacos Client会通过发送REST请求的方式向Nacos Server注册自己的服务,提供自身的元数据,比如ip地址、端口等信息。Nacos Server接收到注册请求后,就会把这些元数据信息存储在一个双层的内存Map中。
- 服务心跳:在服务注册后,Nacos Client会维护一个定时心跳来持续通知Nacos Server,说明服务一直处于可用状态,防止被剔除。默认5s发送一次心跳。
- 服务健康检查:Nacos Server会开启一个定时任务用来检查注册服务实例的健康情况,对于超过15s没有收到客户端心跳的实例会将它的healthy属性置为false(客户端服务发现时不会发现),如果某个实例超过30秒没有收到心跳,直接剔除该实例(被剔除的实例如果恢复 发送心跳则会重新注册)
- 服务发现:服务消费者(Nacos Client)在调用服务提供者的服务时,会发送一个REST请求给Nacos Server,获取上面注册的服务清单,并且缓存在Nacos Client本地,同时会在Nacos Client本地开启一个定时任务定时拉取服务端最新的注册表信息更新到本地缓存
- 服务同步:Nacos Server集群之间会互相同步服务实例,用来保证服务信息的一致性。
3. 整体思路
看源码之前,先整理一下思路,假设我们自己写一个Nacos的话,包含几个模块呢。
- Nacos服务端:作用是提供服务注册、服务发现、心跳任务、服务同步以及服务健康检查功能。
- Nacos客户端:作用是给每一个微服务提供调用方法,跟Nacos服务端进行通信。
那么Nacos客户端怎么跟服务端进行通信呢,无外乎就是HTTP调用,那么需要Nacos服务端提供调用接口。
4. 源码入口
参考之前Nacos的使用文章,Nacos,我们发现依赖文件里会有下面这么一个依赖。
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
复制代码
这是利用了SpingBoot自动装配原理,引入了一个nacos的starter,有过SpringBoot自动装配基础的同学应该知道,在对应的包里会有spring.factories文件,文件里会有真正的调用类存在,让我们来看下。
好,我们找到了这个文件,查看该文件的内容。
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration,\
com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\
com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientAutoConfiguration,\
com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
复制代码
EnableAutoConfiguration这个就是自动装配的Key,后边的value值就是真正的调用类。这里直接看第一个类NacosDiscoveryAutoConfiguration。
4.1 NacosDiscoveryAutoConfiguration
给类有三个Bean,其中最后一个Bean是核心Bean。
@Configuration
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
AutoServiceRegistrationAutoConfiguration.class })
public class NacosDiscoveryAutoConfiguration {
@Bean
public NacosServiceRegistry nacosServiceRegistry(
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosDiscoveryProperties);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration(
NacosDiscoveryProperties nacosDiscoveryProperties,
ApplicationContext context) {
return new NacosRegistration(nacosDiscoveryProperties, context);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
}
复制代码
4.2 NacosAutoServiceRegistration
4.2.1 构造方法
这里又调用了父类的构造方法
public NacosAutoServiceRegistration(ServiceRegistry<Registration> serviceRegistry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
super(serviceRegistry, autoServiceRegistrationProperties);
this.registration = registration;
}
复制代码
4.3 AbstractAutoServiceRegistration
4.3.1 构造方法
该类的构造方法没啥东西,就是把NacosServiceRegistry赋值给serviceRegistry
protected AbstractAutoServiceRegistration(ServiceRegistry<R> serviceRegistry,
AutoServiceRegistrationProperties properties) {
this.serviceRegistry = serviceRegistry;
this.properties = properties;
}
复制代码
但是从该类的类结构图中可以看到,该类实现了Spring里的事件监听接口,那么意味着当Spring容器启动之后,会执行该类的onApplicationEvent方法。
4.3.2 onApplicationEvent
这里又调用了bind方法。
@Override
@SuppressWarnings("deprecation")
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
复制代码
4.3.3 bind()
看源码有个技巧就是遇到这种if判断然后直接return的,可以先不看,因为大概率不是主流程。
@Deprecated
public void bind(WebServerInitializedEvent event) {
//获取应用上下文
ApplicationContext context = event.getApplicationContext();
if (context instanceof ConfigurableWebServerApplicationContext) {
if ("management".equals(((ConfigurableWebServerApplicationContext) context)
.getServerNamespace())) {
return;
}
}
//CAS设置端口
this.port.compareAndSet(0, event.getWebServer().getPort());
//执行启动方法
this.start();
}
复制代码
4.3.4 start()
直接看核心方法register()
public void start() {
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
return;
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get()) {
this.context.publishEvent(
new InstancePreRegisteredEvent(this, getRegistration()));
register();
if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, getConfiguration()));
this.running.compareAndSet(false, true);
}
}
复制代码
4.3.5 register()
这里就调用了,前边NacosServiceRegistry的register方法。
protected void register() {
this.serviceRegistry.register(getRegistration());
}
复制代码
4.4 NacosServiceRegistry
4.4.1 register()
这里可以从打印出的日志上看到执行完registerInstance方法之后,服务注册完成,所以源码入口函数已经找到了。
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
//获取服务ID
String serviceId = registration.getServiceId();
//获取服务实例,包括服务IP、端口、权重、集群名称以及元数据
Instance instance = getNacosInstanceFromRegistration(registration);
try {
//执行注册逻辑
namingService.registerInstance(serviceId, instance);
//可以从日志上看到,上边方法执行完之后,一个微服务就注册完毕了,所以入口已经找到
log.info("nacos registry, {} {}:{} register finished", serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
}
}
复制代码
至此源码入口函数已经找到,目前所有代码都是在Nacos的客户端代码,也就是提供给微服务的代码,并没有涉及到Nacos服务端的代码。
5. 服务注册
5.1 Nacos Client端
5.1.1 NacosNamingService
看该类的registerInstance方法
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
//判断是不是临时实例
if (instance.isEphemeral()) {
//设置心跳信息
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
long instanceInterval = instance.getInstanceHeartBeatInterval();
beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
//调用代理类注册服务
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
复制代码
这里直接看注册实例的代码,if里的是心跳服务的代码,现在先不看。
5.1.2 NamingProxy
这里就是向Nacos服务端发送请求,请求的地址为”/nacos/v1/ns/instance”,类型是POST,进行实例的注册,底层用的是HttpClient。
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
namespaceId, serviceName, instance);
final Map<String, String> params = new HashMap<String, String>(9);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
}
复制代码
下面我们来看看,Nacos服务端接到了这个请求是如何处理的。
5.2 Nacos Server端
对于这种HTTP请求,在Server端一定会有一个Controller进行接收,我们直接查InstanceController。
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
复制代码
说明关于实例的处理逻辑都在这个Controller里。
5.2.1 InstanceController.register()
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
//获取namespaceId
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
//获取服务名称
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
//校验服务名称不能为空
NamingUtils.checkServiceNameFormat(serviceName);
//从请求参数中解析instance属性,并重新赋值
final Instance instance = parseInstance(request);
//注册实例
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
复制代码
5.2.2 ServiceManager
这个类是具体的处理类,可以想象为我们写代码时的XXXService类。
5.2.2.1 registerInstance()
逻辑很清晰,Nacos的源码里,很多方法都不超过Idea的一屏,看着很清爽。该方法创建了服务实例。
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
//创建服务,并放入缓存
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
//从缓存中获取服务
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
//添加服务实例
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
复制代码
5.2.2.2 createServiceIfAbsent
createEmptyService方法直接调用了createServiceIfAbsent。
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
createServiceIfAbsent(namespaceId, serviceName, local, null);
}
复制代码
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
//从缓存中获取服务
Service service = getService(namespaceId, serviceName);
//由于是第一次注册,所以一定为空
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
//这里cluster为空
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
//放入服务并初始化
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}
复制代码
5.2.2.3 putServiceAndInit
private void putServiceAndInit(Service service) throws NacosException {
//将service放入缓存
putService(service);
//对服务进行初始化
service.init();
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
复制代码
5.2.2.4 putService()
DCL放入缓存的数据结构中
/**
* Map(namespace, Map(group::serviceName, Service)).
*/
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
复制代码
public void putService(Service service) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
synchronized (putServiceLock) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
}
}
}
serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}
复制代码
5.2.2.5 init()
这里边是健康检查的逻辑,下边分析。
public void init() {
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
//循环进行服务健康检查
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
复制代码
5.2.2.6 addInstance
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
//创建临时实例的Key
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
//从缓存中获取服务
Service service = getService(namespaceId, serviceName);
//将服务放入内存中
synchronized (service) {
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
//调用DelegateConsistencyServiceImpl.put方法
consistencyService.put(key, instances);
}
}
复制代码
5.3 DelegateConsistencyServiceImpl
5.3.1 put
public void put(String key, Record value) throws NacosException {
mapConsistencyService(key).put(key, value);
}
复制代码
由于是临时实例,会走阿里自己实现的AP模式的Distro协议。
5.4 DistroConsistencyServiceImpl
5.4.1 put
public void put(String key, Record value) throws NacosException {
onPut(key, value);
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
复制代码
5.4.2 onPut
这里调用了notifier.addTask,其中notifier是一个Runnable
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
notifier.addTask(key, DataOperation.CHANGE);
}
复制代码
5.4.3 notifier.addTask
这里的tasks是一个阻塞队列,这里就是把任务放入阻塞队里。
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
复制代码
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
tasks.offer(Pair.with(datumKey, action));
}
复制代码
那么什么时候获取这个任务呢? 既然Notifier是一个Runnable,那么就一定会执行他的run方法。
5.4.4 notifier.run
可以看到在run方法里,执行了一个死循环,不停的从队列里边拿任务,如果没有任务就阻塞住。
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
for (; ; ) {
try {
Pair<String, DataOperation> pair = tasks.take();
//处理任务
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
复制代码
那么什么时候执行run方法呢?
5.4.5 init
可以看到在执行完构造方法之后,就会执行init()方法,并新建一个notifier任务,放入线程池里。
@PostConstruct
public void init() {
GlobalExecutor.submitDistroNotifyTask(notifier);
}
复制代码
5.4.6 handle(pair)
private void handle(Pair<String, DataOperation> pair) {
try {
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();
//获取了当前任务,就要把缓存里的当前服务删掉
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
return;
}
//循环处理
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
//更新动作走更新逻辑,这里走的是该逻辑
if (action == DataOperation.CHANGE) {
//这里调用的是Service.onChange方法
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
//删除动作走删除逻辑
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO
.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
复制代码
5.5 Service
5.5.1 onChange
public void onChange(String key, Instances value) throws Exception {
Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
//循环获取实例
for (Instance instance : value.getInstanceList()) {
if (instance == null) {
// Reject this abnormal instance list:
throw new RuntimeException("got null instance " + key);
}
//异常参数校验
if (instance.getWeight() > 10000.0D) {
instance.setWeight(10000.0D);
}
//异常参数校验
if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
instance.setWeight(0.01D);
}
}
//更新实例列表,这里命名不准确
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
recalculateChecksum();
}
复制代码
5.5.2 updateIPs
这里直接看这一段,更新实例列表
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//make every ip mine
List<Instance> entryIPs = entry.getValue();
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}
复制代码
5.6 Cluster
5.6.1 updateIps
这里用了CopyOnWrite思想,在写的时候复制一份副本。
public void updateIps(List<Instance> ips, boolean ephemeral) {
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
for (Instance ip : toUpdateInstances) {
oldIpMap.put(ip.getDatumKey(), ip);
}
List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());
if (updatedIPs.size() > 0) {
for (Instance ip : updatedIPs) {
Instance oldIP = oldIpMap.get(ip.getDatumKey());
// do not update the ip validation status of updated ips
// because the checker has the most precise result
// Only when ip is not marked, don't we update the health status of IP:
if (!ip.isMarked()) {
ip.setHealthy(oldIP.isHealthy());
}
if (ip.isHealthy() != oldIP.isHealthy()) {
// ip validation status updated
Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),
(ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());
}
if (ip.getWeight() != oldIP.getWeight()) {
// ip validation status updated
Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(),
ip.toString());
}
}
}
List<Instance> newIPs = subtract(ips, oldIpMap.values());
if (newIPs.size() > 0) {
Loggers.EVT_LOG
.info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
getName(), newIPs.size(), newIPs.toString());
for (Instance ip : newIPs) {
HealthCheckStatus.reset(ip);
}
}
List<Instance> deadIPs = subtract(oldIpMap.values(), ips);
if (deadIPs.size() > 0) {
Loggers.EVT_LOG
.info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
getName(), deadIPs.size(), deadIPs.toString());
for (Instance ip : deadIPs) {
HealthCheckStatus.remv(ip);
}
}
toUpdateInstances = new HashSet<>(ips);
if (ephemeral) {
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}
复制代码
5.7 PushService
更新完实例之后,会主动推送给客户端,这里Nacos用的是主动推送的方式,当注册中心的服务列表更新之后,直接推送给客户端。
该类还是实现了ApplicationListener接口,所以依然会执行onApplicationEvent方法
5.7.1 onApplicationEvent
方法太长,不全贴了,只看这一行,这里用的是UDP发送的,而不是向Zookeeper用的是TCP。
udpPush(ackEntry);
复制代码
至此服务注册流程完毕。
6. 服务心跳
看代码之前,我们先想一下,心跳服务是做什么用的。首先心跳服务用来检测客户端是否还存活而设置的一种服务,服务端和客户端之间不可能一直维持着连接,所以会通过客户端定时发送心跳服务至服务端,来证明服务还存活。
之前看客户端代码时,注册实例之前,会先向服务端发送一个心跳服务。
6.1 客户端
6.1.1 NacosNamingService.registerInstance
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
//临时实例,发送心跳服务
if (instance.isEphemeral()) {
//构建心跳对象
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
long instanceInterval = instance.getInstanceHeartBeatInterval();
//设置心跳周期,默认是5秒,每隔5秒由客户端向服务端发送心跳请求
beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
//心跳请求
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
复制代码
6.1.2 BeatReactor.addBeatInfo
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
dom2Beat.put(buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);
//在线程池中放入一个心跳任务
executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}
复制代码
6.1.3 BeatTask.run
在run()方法中一直执行BeatTask任务,保证只要客户端一直可以给服务端发送心跳指令。
public void run() {
if (beatInfo.isStopped()) {
return;
}
//发送心跳服务,这里会返回0
long result = serverProxy.sendBeat(beatInfo);
//设置下一次心跳服务时间
long nextTime = result > 0 ? result : beatInfo.getPeriod();
//延迟任务,每隔5秒执行
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
复制代码
下面来看服务端代码
6.2 服务端
6.2.1 InstanceController.beat
代码太长了,这里说说流程。
- 先从请求值里解析参数
- 构建所需参数
- 获取Instance实例,没有就注册
- 获取service实例,没有就注册
- service.processClientBeat(clientBeat)具体处理逻辑
6.2.2 Service.processClientBeat
public void processClientBeat(final RsInfo rsInfo) {
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
clientBeatProcessor.setService(this);
clientBeatProcessor.setRsInfo(rsInfo);
//执行clientBeatProcessor任务
HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
复制代码
6.2.3 ClientBeatProcessor.run
这里就是更新客户端实例的最后心跳时间
public void run() {
Service service = this.service;
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
}
String ip = rsInfo.getIp();
String clusterName = rsInfo.getCluster();
int port = rsInfo.getPort();
Cluster cluster = service.getClusterMap().get(clusterName);
List<Instance> instances = cluster.allIPs(true);
for (Instance instance : instances) {
if (instance.getIp().equals(ip) && instance.getPort() == port) {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
}
instance.setLastBeat(System.currentTimeMillis());
//由于客户端发送了心跳请求,所以这里就把原本不健康的服务,更新为健康的
if (!instance.isMarked()) {
if (!instance.isHealthy()) {
instance.setHealthy(true);
Loggers.EVT_LOG
.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
cluster.getService().getName(), ip, port, cluster.getName(),
UtilsAndCommons.LOCALHOST_SITE);
//向客户端重新推送健康服务
getPushService().serviceChanged(service);
}
}
}
}
}
复制代码
7. 服务健康检查
上文在注册服务的时候有一个Service.init()方法,里边包含了服务健康检查的逻辑
public void init() {
//提交服务健康检查任务
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
复制代码
7.1 ClientBeatCheckTask
7.1.1 run()
public void run() {
try {
if (!getDistroMapper().responsible(service.getName())) {
return;
}
if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
}
List<Instance> instances = service.allIPs(true);
// first set health status of instances:
for (Instance instance : instances) {
//服务超过15秒无响应
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
//服务不健康
instance.setHealthy(false);
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
//推送给客户端
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// then remove obsolete instances:
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
//服务超过30秒无响应
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
//调用删除接口,删除服务
deleteIp(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
复制代码
8. 服务发现
服务发现是在第一次调用服务接口时根据服务名去服务端获取的,这个请看ribbo源码,这里看下底层逻辑。
8.1 客户端
8.1.1 NacosNamingService.getAllInstances
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
if (subscribe) {
//获取服务列表
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
}
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
return list;
}
复制代码
8.1.2 HostReactor
8.1.2.1 getServiceInfo
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
//获取服务对象
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
//缓存里没有就重新生成
if (null == serviceObj) {
//构建服务对象
serviceObj = new ServiceInfo(serviceName, clusters);
//放入服务缓存Map
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
//放入更新缓存中
updatingMap.put(serviceName, new Object());
//更新服务
updateServiceNow(serviceName, clusters);
//从缓存中删除
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
//如果服务正在更新
if (UPDATE_HOLD_INTERVAL > 0) {
//同步将对象锁住
synchronized (serviceObj) {
try {
//等5秒
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
//放入线程池中执行,定时更新客户端的服务缓存
scheduleUpdateIfAbsent(serviceName, clusters);
//从缓存中获取服务对象
return serviceInfoMap.get(serviceObj.getKey());
}
复制代码
8.1.2.2 updateServiceNow
public void updateServiceNow(String serviceName, String clusters) {
//获取服务
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
//从服务端获取服务列表
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
if (StringUtils.isNotEmpty(result)) {
processServiceJSON(result);
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}
复制代码
8.1.2.3 scheduleUpdateIfAbsent
新建UpdateTask执行
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
synchronized (futureMap) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
}
}
复制代码
8.1.2.4 UpdateTask.run
public void run() {
try {
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
//缓存里没有就重新从服务端获取
if (serviceObj == null) {
updateServiceNow(serviceName, clusters);
executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
return;
}
//服务不是最新的,就重新从服务端获取
if (serviceObj.getLastRefTime() <= lastRefTime) {
updateServiceNow(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
//如果服务已经被服务端主动Push过,这里不应该覆盖。(这段代码感觉没有用,因为只是发了一个请求,本地不更新任何数据)
refreshOnly(serviceName, clusters);
}
//放入线程池执行任务
executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);
lastRefTime = serviceObj.getLastRefTime();
} catch (Throwable e) {
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
}
}
复制代码
8.2 服务端
8.2.1 InstanceController.list
前边就是从请求值里边获取参数、构建参数,重点逻辑在doSrvIpxt方法里。(吐槽下这个命名。。。)
8.2.2 doSrvIpxt()
该方法作用就是从实例中获取服务的所有信息。
8.2.3 service.srvIPs
从集群中获取所有实例
8.2.4 allIPs
public List<Instance> allIPs(List<String> clusters) {
List<Instance> result = new ArrayList<>();
for (String cluster : clusters) {
Cluster clusterObj = clusterMap.get(cluster);
if (clusterObj == null) {
continue;
}
//获取所有实例
result.addAll(clusterObj.allIPs());
}
return result;
}
复制代码
8.2.5 clusterObj.allIPs()
从注册的信息中,获取所有实例。
public List<Instance> allIPs() {
List<Instance> allInstances = new ArrayList<>();
allInstances.addAll(persistentInstances);
allInstances.addAll(ephemeralInstances);
return allInstances;
}
复制代码
至此单机版的Nacos注册中心源码分析完毕,最后附上一张整体流程图。