SpringCloud 服务发现:DiscoveryClient

服务发现:DiscoveryClient

服务发现由客户端和服务端进行配合完成的。本节先探讨客户端中机制。

本文删减源码中部分不重要的步骤,完全过程请参考源码;

另外,本文所作的源码分析是在 spring-boot 应用中,且包含 actuator 依赖的前提下;

本文使用的 eureka-client 的版本为 1.9.21spring-boot版本为 2.3.0.RELEASEspring-cloud 的版本为 Hoxton.SR5

代码初始化入口

com.netflix.discovery.DiscoveryClient#DiscoveryClient(
    ApplicationInfoManager applicationInfoManager, 
    EurekaClientConfig config, 
    AbstractDiscoveryClientOptionalArgs args,
    Provider<BackupRegistry> backupRegistryProvider, 
    EndpointRandomizer endpointRandomizer)
复制代码

客户端中的核心机制

  • 续约
  • 状态更新
  • 抓取注册表

服务发现客户端的初始化流程

image-20210328114629731.png

可以看到,客户端是使用了三个部分分别完成以下功能:

  • 续约(发送心跳)
  • 刷新缓存(抓取注册表)
  • 实例状态复制(更新本地实例状态到远程注册中心)

TimedSupervisorTask 实现

可以看到,以上提到的三个主要功能有两个是由 TimedSupervisorTask 进行调度的。所以这里先对 TimedSupervisorTask 进行分析。

这个类是 TimerTask 的一个实现类,但是 DiscovertClient 将其通过 scheduler#schedule 方法进行调度的时候,是将这个任务当做一个 Runnable 使用的,所以我们不关注它作为 TimerTask 的功能。

它的重要属性:

  • 三个计数器,用于对任务调度执行的情况进行统计,分别是 超时、拒绝以及异常的情况

    private final Counter timeoutCounter;
    private final Counter rejectedCounter;
    private final Counter throwableCounter;
    复制代码
  • 需要被调度的任务:

    private final Runnable task
    复制代码
  • 用来调度自身的一个调度线程池:

    private final ScheduledExecutorService scheduler;
    复制代码
  • 用于执行异步任务的线程池,这个线程池用于执行被这个类所包裹的任务:

    private final ThreadPoolExecutor executor;
    复制代码
  • 被调度的任务的超时时间,超过这个时间,就认为任务执行超时:

    private final long timeoutMillis
    复制代码
  • 下次调度执行自身的延时,也就是某次自身调度结束之后,经过多久时间,下一次调度开始,这个时间会随着超时次数进行变化,以减少无效的调度:

    private final AtomicLong delay
    复制代码
  • 最大的调度自身的时间间隔。因为任务超时时,每次延迟的时间变成两倍,这个最大间隔则是延迟时间增长的上限,避免无限制推迟自身调度:

    private final long maxDelay
    复制代码
  • 记录执行器的活跃线程数

    private final LongGauge threadPoolLevelGauge;
    复制代码

这个类中的代码不多,除了一个构造器之外,就只是一个实现的 run 方法,如下:

@Override
public void run() {
    Future<?> future = null;
    try {
        // 以任务的执行器来提交要执行的任务(具体可能是续约,抓取注册表等任务),这里是异步任务
        future = executor.submit(task);
        threadPoolLevelGauge.set((long) executor.getActiveCount());
        // 这里用超时获取结果的方法获取结果,如果任务超时,会抛出TimeoutException异常,进入到catch块
        // 只有在超时时间内正常执行完成获得结果,才会继续执行 try 部分的之后语句
        future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
        // 走到这里,说明此次任务执行成功了,将延迟恢复到初始状态
        delay.set(timeoutMillis);
        threadPoolLevelGauge.set((long) executor.getActiveCount());
    } catch (TimeoutException e) {
        logger.warn("task supervisor timed out", e);
        timeoutCounter.increment();
		// 超时了,记录当前延迟
        long currentDelay = delay.get();
        // 设置延迟为2倍,取最大延迟时间和翻倍延迟时间中的最小值,也就是延迟时间最大不能超过maxDelay
        long newDelay = Math.min(maxDelay, currentDelay * 2);
        // 设置延迟时间
        delay.compareAndSet(currentDelay, newDelay);

    } catch (RejectedExecutionException e) {
        if (executor.isShutdown() || scheduler.isShutdown()) {
            logger.warn("task supervisor shutting down, reject the task", e);
        } else {
            logger.warn("task supervisor rejected the task", e);
        }

        rejectedCounter.increment();
    } catch (Throwable e) {
        if (executor.isShutdown() || scheduler.isShutdown()) {
            logger.warn("task supervisor shutting down, can't accept the task");
        } else {
            logger.warn("task supervisor threw an exception", e);
        }

        throwableCounter.increment();
    } finally {
        // 如果走到这里,调用取消方法(只有非正常执行的时候有意义)
        if (future != null) {
            future.cancel(true);
        }

        if (!scheduler.isShutdown()) {
            // 以之前设置的延迟,进行自身的下一次调度
            scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
        }
    }
}
复制代码

总结:

  1. 真正要执行的任务以异步任务的形式用另一个线程池进行执行,达成了超时机制;
  2. 如果任务每次都正常执行,则任务就是相当于按照一开始的超时时间进行定时调度。而如果任务执行超时了(此时可能网络不同或者服务端故障),则推迟下次调度时机,以减少无效的调度;
  3. 实现功能时,也对执行情况进行了数据收集,便于进行分析运行情况;

刷新缓存机制

从之前的代码可以看到,真正的续约任务是以 com.netflix.discovery.DiscoveryClient.CacheRefreshThread 的形式托管给了 TimedSupervisorTask 进行调度。

如下是 CacheRefreshThread :

class CacheRefreshThread implements Runnable {
    public void run() {
        refreshRegistry();
    }
}
复制代码

方法入口:com.netflix.discovery.DiscoveryClient#refreshRegistry。接下来进行注册表的刷新步骤分析。

image-20210330085549044.png

总的来说,全量更新是直接获取服务器的所有注册表,而增量更新是从服务器端获取增量的部分,然后本地根据服务端所提供的数据中包含的应用的操作类型(增加、修改或者删除)对本地数据进行更新(在 com.netflix.discovery.DiscoveryClient#updateDelta 方法中)。而服务端在客户端没有提供版本号的情况下是如何返回增量数据的,需要之后对服务端的实现进行分析后得到答案。

关于刷新缓存的机制的频率,由参数 registryFetchIntervalSeconds 决定,默认值 30 秒。

续约机制

再次回顾 DiscoveryClient 的启动过程,可以看到,和刷新注册表的任务一样,续约任务(发送心跳任务)也是通过类似的模式实现的调度。

下面是 com.netflix.discovery.DiscoveryClient.HeartbeatThread 的类声明:

private class HeartbeatThread implements Runnable {
    public void run() {
        if (renew()) {
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}
复制代码

com.netflix.discovery.DiscoveryClient.HeartbeatThread 包裹了 com.netflix.discovery.DiscoveryClient#renew 方法的调用,然后将自身作为一个 Runnable 传递给 TimedSupervisorTask 进行调度。也就是心跳机制也会受到 超时-推迟 这个机制的保护。

它的内部逻辑并不多,接下来,我们看看它的内部逻辑:

boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        // 通过API向服务端发送心跳请求
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
        if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
            REREGISTER_COUNTER.increment();
            logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
            long timestamp = instanceInfo.setIsDirtyWithTime();
            // 从服务端返回了 404,会尝试进行注册操作
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            // 注册成功,返回true
            return success;
        }
        // 状态码为200,返回true
        return httpResponse.getStatusCode() == Status.OK.getStatusCode();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
        return false;
    }
}
复制代码

总共的逻辑就两点:

  • 如果发送心跳成功,就 返回 true
  • 如果发送心跳请求的响应状态码是 404,就发起一次注册请求,注册成功,则返回 true

关于续约的频率: 由 renewalIntervalInSecs 参数决定,默认值为 30 秒。

服务注册

关于服务注册,有两点需要进行了解

  1. 服务注册的时机
  2. 服务注册的逻辑

首先,看下服务注册的逻辑,方法入口在 com.netflix.discovery.DiscoveryClient#register

/**
 * Register with the eureka service by making the appropriate REST call.
 */
boolean register() throws Throwable {
    logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
    EurekaHttpResponse<Void> httpResponse;
    try {
        // 调用API将自己的实例信息提交到注册中心
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
复制代码

整个注册的过程其实逻辑很简单,只是调用 API 将自身的实例信息传到注册中心。

其次,看看注册的时机:

  1. DiscoveryClient 构造的时候,也就是客户端启动的时候,调用一次注册方法;
  2. 其次是在 renew 方法中,如果向服务器发送心跳时,响应状态码是 404 时,调用注册方法;
  3. 在实例复制器的逻辑中 com.netflix.discovery.InstanceInfoReplicator#run ,如果判断实例信息脏了,进行注册;

实例信息刷新机制

在了解实例的信息刷新机制之前,我们先了解下,代表实例信息的类 com.netflix.appinfo.InstanceInfo

类中的重要属性如下:

/**
 * 客户端实例的实例id,在同一个应用下唯一,例如 192.168.18.109:blog-service:8010
 */
private volatile String instanceId;
/**
 * 实例的所属应用的名称,默认是服务名称的大写形式,例如 BLOG-SERVICE
 */
private volatile String appName;
/**
 * 实例的ip地址
 */
private volatile String ipAddr;
/**
 * 客户端的端口,默认为 7001 
 */
private volatile int port = DEFAULT_PORT;
/**
 * 客户端的安全端口,默认为 7002
 */
private volatile int securePort = DEFAULT_SECURE_PORT;
/**
 * 虚拟 ip 地址(逻辑地址),服务名称,比如: blog-service
 */
private volatile String vipAddress;
/**
 * 安全的虚拟 ip 地址
 */
private volatile String secureVipAddress;
/**
 * 是否使用安全的端口,默认为 false
 */
private volatile boolean isSecurePortEnabled = false;
/**
 * 是否使用不安全的端口,默认为 true
 */
private volatile boolean isUnsecurePortEnabled = true;
/**
 * 数据中心的信息
 */
private volatile DataCenterInfo dataCenterInfo;
/**
 * 主机名称
 */
private volatile String hostName;
/**
 * 实例状态
 */
private volatile InstanceStatus status = InstanceStatus.UP;
/**
 * 用于记录外部对当前 instance 修改的状态,初值为 UNKNOWN。
 * 这个状态仅在 Server 端是有意义的。其意义就是通过修改 Server 端 instanceInfo 的 overriddenStatus 的值
 * 来达到修改 Server 端对应 instanceInfo 的 status 的值的目的。
 */
private volatile InstanceStatus overriddenStatus = InstanceStatus.UNKNOWN;
/**
 * 实例信息是否 “脏” 了,如果本地发送了变化,但是没有同步到服务器,这个字段就是 true
 */
private volatile boolean isInstanceInfoDirty = false;
/**
 * 续约信息
 */
private volatile LeaseInfo leaseInfo;
/**
 * 是否是候选的服务发现的 server
 */
private volatile Boolean isCoordinatingDiscoveryServer = Boolean.FALSE;
/**
 * 客户端的元数据,用户可以通过配置的方式自定义
 */
private volatile Map<String, String> metadata;
/**
 * 上次在服务端更新信息的时间戳
 */
private volatile Long lastUpdatedTimestamp;
/**
 * 上次实例信息发生变化的时间戳
 */
private volatile Long lastDirtyTimestamp;
/**
 * 在服务端,实例的信息发生的变化的类型(ADDED, MODIFIED, DELETED)
 */
private volatile ActionType actionType;
/**
 * 实例信息的版本字段
 */
private String version = VERSION_UNKNOWN;
复制代码

其中 status 字段保持了客户端实例的服务状态,这个枚举 InstanceStatus 可选的值为:

  • UP: 能够接收流量;
  • DOWN: 不要发送流量,健康检查失败;
  • STARTING: 正在启动,不要发送流量;
  • OUT_OF_SERVICE: 不提供服务,不要发送流量,一般是 shutdown 的时候用;
  • UNKNOWN: 未知的状态;

这个类中保存了用于注册的信息以及被其它组件进行发现(使用)的信息。

实例信息刷新的方法是: com.netflix.discovery.DiscoveryClient#refreshInstanceInfo。下图是它的时序图:

image-20210401205718537.png

其中,ApplicationInfoManager 是对实例信息做了简单的管理。

图中涉及的几个类:

  • EurekaHealthCheckHandler : spring-cloud-netflix-eureka-client 包中的类,继承 com.netflix.appinfo.HealthCheckHandler 集成 spring 的健康检查机制。它通过 applicationContext.getBeansOfType(HealthIndicator.class) 获取容器中的健康指示器的 bean 作为健康检查的方式。
  • org.springframework.boot.actuate.health.HealthIndicator: 健康指示器,来自于 spring-boot-actuator 包,提供 Health health() 方法获取健康相关的信息。图中涉及的健康检查的实现类有(引入的依赖不同,健康检查器的列表就不同,以下仅供参考):
    • org.springframework.cloud.netflix.hystrix.HystrixHealthIndicator
    • org.springframework.boot.actuate.system.DiskSpaceHealthIndicator
    • org.springframework.boot.actuate.health.PingHealthIndicator: 总是返回 up 的健康指示器,是默认的实现类,是保底机制;
    • org.springframework.cloud.health.RefreshScopeHealthIndicator
  • org.springframework.boot.actuate.health.SimpleStatusAggregatororg.springframework.boot.actuate.health.StatusAggregator 的简单实现,聚合一组状态得到一个结果状态;
  • com.netflix.appinfo.ApplicationInfoManager.StatusChangeListener: 监听实例状态发生改变的监听器接口,是观察者;

总的来说,就是更新三部分的实例信息

  • 更新主机 hostnameip 信息;
  • 更新续约信息;
  • 通过健康检查器更新实例的状态

下文会提到,刷新实例信息的调用时机只是在实例信息复制器中,也就是 com.netflix.discovery.InstanceInfoReplicator#run

实例信息复制器

实例信息复制器的主要功能就是刷新当前客户端实例的信息,如果有变化则同步到服务端。

主要代码实现是在 com.netflix.discovery.InstanceInfoReplicator 类中。

其中主要的属性如下:

/**
 * 用于和服务器通信
 */
private final DiscoveryClient discoveryClient;
/**
 * 实例信息的一个引用
 */
private final InstanceInfo instanceInfo;
/**
 * 进行复制的间隔秒数
 */
private final int replicationIntervalSeconds;
/**
 * 用于调度自身进行复制的调度器,核心线程数为 1
 */
private final ScheduledExecutorService scheduler;
/**
 * 用于存储正在调用执行的异步任务的结果引用,主要是用于强制更新时的任务取消
 */
private final AtomicReference<Future> scheduledPeriodicRef;
/**
 * 表示复制器是否启动
 */
private final AtomicBoolean started;
/**
 * 限流机制的实现,主要限流的对象是强制执行复制这个操作
 */
private final RateLimiter rateLimiter;
/**
 * 上限大小,这个数值与一定时间内可以通过的操作次数大小呈正相关
 */
private final int burstSize;
/**
 * 由 burstSize 和 replicationIntervalSeconds 两个数值计算所得:
 * 60 * this.burstSize / this.replicationIntervalSeconds
 */
private final int allowedRatePerMinute;
复制代码

而启动的方法是 com.netflix.discovery.InstanceInfoReplicator#start ,这个对象的构建以及这个方法的构建都在 com.netflix.discovery.DiscoveryClient#initScheduledTasks 方法中,调用了 start 方法之后,提交第一次延迟调度任务。如下:

public void start(int initialDelayMs) {
    if (started.compareAndSet(false, true)) {
        // 这样设置是为了第一次调度时,将实例的状态复制到远程服务器
        instanceInfo.setIsDirty();
        // 第一次调度自己
        Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
        // 保存提交的任务的引用
        scheduledPeriodicRef.set(next);
    }
}
复制代码

提交之后,执行的主体就到了 run 方法:

public void run() {
    try {
        // 调用客户端实例的刷新信息的方法
        discoveryClient.refreshInstanceInfo();

        // 如果实例信息发生了变化,进行注册
        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            // 调用API注册客户端
            discoveryClient.register();
            // 设置实例信息状态为同步完成的
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        // 无论发生了什么,下一次的调度都会开始
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        // 保存新一次调度任务的引用
        scheduledPeriodicRef.set(next);
    }
}
复制代码

可以看出,复制器做的事情就是:

  1. 刷新本地客户端实例的信息(包括状态);
  2. 有必要的话,通过 API 以注册的方式同步本地客户端实例的信息到远程服务器;

另外,复制器除了调度的机制,还有强制复制的主动复制方式,方法是 onDemandUpdate

public boolean onDemandUpdate() {
    // 首先确认限流器允许这次手动更新,才进行更新
    if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
        // 如果调度器没有关闭,才进行处理
        if (!scheduler.isShutdown()) {
            // 异步执行手动更新的任务
            scheduler.submit(new Runnable() {
                @Override
                public void run() {
                    logger.debug("Executing on-demand update of local InstanceInfo");
					// 获取到正在执行的正常调度的更新任务(包括正常的定时更新任务以及之前可能的手动更新触发的任务)
                    Future latestPeriodic = scheduledPeriodicRef.get();
                    // 假如获取到这个任务赋值给本地变量后后,上一次的任务刚刚执行到 run 方法的最后一行,
                    // 是可能存在多个任务同时在线程池的队列的吧?(疑问)
                    if (latestPeriodic != null && !latestPeriodic.isDone()) {
                        logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                        // 如果之前的任务没有完成,取消它
                        latestPeriodic.cancel(false);
                    }
					// 调用 run 方法进行更新
                    InstanceInfoReplicator.this.run();
                }
            });
            return true;
        } else {
            logger.warn("Ignoring onDemand update due to stopped scheduler");
            return false;
        }
    } else {
        logger.warn("Ignoring onDemand update due to rate limiter");
        return false;
    }
}
复制代码

这里的逻辑是,通过限流器的限制后,会将仍然在运行中的之前的任务进行取消,然后执行更新任务。

根据类的描述:一个新的更新任务总是在一个更早的更新任务结束后进行调度。然而,如果一个手动执行的任务开始,调度的自动更新任务会被丢弃,接着,一个新的自动调度任务会在手动执行更新的最后进行调度。

这个机制里面,根据上一个代码块中的注释,是怀疑可能存在多个任务同时在线程池的队列,这样就可能存在和类描述不符合的情况。这个疑问暂时不深入探索了,之后再补 TODO

强制更新的时机有两个:

  1. 注册了健康检查器的时候;
  2. 接收到实例状态更新事件时;

实例状态的复制的频率由两部分决定:

  1. instanceInfoReplicationIntervalSeconds 参数决定,默认值为 30 秒调度一次;
  2. onDemandUpdateStatusChange 参数如果为 true,则表示每次实例状态发生改变,则通知到实例状态改变监听器,这个监听器调动复制器的手动更新方法进行更新,默认值为 true

限流机制

上一小节提到,复制器使用限流器对手动执行的更新任务的频率进行控制。

现在我们对限流器进行一些分析。

限流器的实现类是 com.netflix.discovery.util.RateLimiter

首先看类的注释:

/**
 * Rate limiter implementation is based on token bucket algorithm. There are two parameters:
 * <ul>
 * <li>
 *     burst size - maximum number of requests allowed into the system as a burst
 * </li>
 * <li>
 *     average rate - expected number of requests per second (RateLimiters using MINUTES is also supported)
 * </li>
 * </ul>
 *
 * @author Tomasz Bak
 */
复制代码

上面的内容说的是:

  1. 限流器是使用令牌桶算法实现的;
  2. burst size,是在一定时间内,能够允许的最大请求数(也就是令牌桶的桶大小);
  3. average rate,表示期待的每秒的请求数(也支持分钟,这个单位在构造器指定);

类中的重要属性是:

/**
 * 构造器传入的时间单位转为毫秒单位需要的转化乘积,
 * 主要是为了统一 [外部计算的单位时间n内的令牌个数的这个n的单位] 与 [内部的毫秒单位]
 */
private final long rateToMsConversion;
/**
 * 当前已经消费的令牌数量
 */
private final AtomicInteger consumedTokens = new AtomicInteger();
/**
 * 上次向桶里装填令牌的时间
 */
private final AtomicLong lastRefillTime = new AtomicLong(0);
复制代码

类的主要供外部调用的用于获取令牌的方法是 acquire 方法:

public boolean acquire(int burstSize, long averageRate, long currentTimeMillis) {
    // 如果限流的桶大小和平均流量不符合要求,直接放过
    if (burstSize <= 0 || averageRate <= 0) { // Instead of throwing exception, we just let all the traffic go
        return true;
    }

    // 如果有必要的话,重新填充令牌桶
    refillToken(burstSize, averageRate, currentTimeMillis);
    // 消费一个令牌
    return consumeToken(burstSize);
}
复制代码

可以看到,这个方法中显示进行填充桶,然后再进行令牌消费。

先看看填充桶的方法 refillToken

// 获取上次填充桶的时间
long refillTime = lastRefillTime.get();
// 计算此刻到上次填充桶的时间的间隔
long timeDelta = currentTimeMillis - refillTime;

// averageRate指定是期待的限流的速率
// 如果写出 timeDelta / rateToMsConversion * averageRate 更符合我个人的思维,也就是将时间除以单位换算,
// 然后乘以指定单位期待的平均的速率就是当前变化的时间内能够产生的token数量
// 但是按照我更容易理解的写法,那可能算出的结果就不一样了,因为这里是向下取整 (这里的计算顺序因为设置到取整确实是个很细节的写法)
long newTokens = timeDelta * averageRate / rateToMsConversion;
// 如果新产生的token大于0,才进入代码块
if (newTokens > 0) {
    // 如果 refillTime 等于 0 就设置新的填充时间戳为当前时间
    // 如果不是第一次填充,就计算得出填充时间,避免因为之前计算新 token 触发的向下取整导致的不准确
    long newRefillTime = refillTime == 0
        ? currentTimeMillis
        : refillTime + newTokens * rateToMsConversion / averageRate;
    // 设置新的填充时间,如果 CAS 设置成功,才进行下一步,否则就放弃这次已经过期(落后于其他线程)的计算
    // 如果失败,说明其它的线程已经基于这时间点填充了 token
    if (lastRefillTime.compareAndSet(refillTime, newRefillTime)) {
        // 一直循环到
        while (true) {
            // 这里面要考虑并发的情况,这里是线程安全的,因为每次 CAS 设置已经消费的 token 数失败后,
            // 都会重新计算相关的数据,获取到已经消费的token数量
            // 同时,因为 lastRefillTime 已经 CAS 设置成功,则别的线程不会重复对这个时间点生成 token
            int currentLevel = consumedTokens.get();
            // 从当前已经消费的 token 和 总的桶大小选出较小数(因为桶的大小是每次方法调用时传入的,可能会变小)
            int adjustedLevel = Math.min(currentLevel, burstSize); // In case burstSize decreased
            // 计算新的已经消费的 token 的数量为 (之前消费的数量减去新生成的token数量),最小为0
            int newLevel = (int) Math.max(0, adjustedLevel - newTokens);
            // CAS 设置成功才退出循环
            if (consumedTokens.compareAndSet(currentLevel, newLevel)) {
                return;
            }
        }
    }
}
复制代码

总结来说,refillToken 方法就是计算当前时间到上一次计算的时间之间应该新生成的 token 数量,放到桶里。

接下来,看看 consumeToken 方法:

private boolean consumeToken(int burstSize) {
    while (true) {
        // 获取当前已经消费的 token 数量
        int currentLevel = consumedTokens.get();
        // 如果当前消费的数量不小于桶数量,就返回 false,获取令牌失败了
        if (currentLevel >= burstSize) {
            return false;
        }
        // 经过上面的 if 判断后,这里是可以尝试消费的情景,如果通过 CAS 设置成功,则获取 token 成功
        // 如果 CAS 失败,则继续循环,直到这两个 if 有一个条件为 true
        if (consumedTokens.compareAndSet(currentLevel, currentLevel + 1)) {
            return true;
        }
    }
}
复制代码

消费的方法就是尝试将 consumedTokens 的值加一而不超过桶大小。

服务下线

服务下线的方法是 com.netflix.discovery.DiscoveryClient#unregister,如下:

void unregister() {
    // It can be null if shouldRegisterWithEureka == false
    if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
        try {
            logger.info("Unregistering ...");
            // 调用 API 取消注册
            EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
            logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());
        } catch (Exception e) {
            logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
        }
    }
}
复制代码

方法中的逻辑简单,只是调用服务端的 API 取消注册。

客户端关闭

客户端关闭的方法是 com.netflix.discovery.DiscoveryClient#shutdown ,如下:

public synchronized void shutdown() {
    // 如果 CAS 成功,才进入关闭的逻辑 (这里既然用了 CAS,方法上的 synchronized 关键字是不是多余了)
    if (isShutdown.compareAndSet(false, true)) {
        logger.info("Shutting down DiscoveryClient ...");
		// 取消注册状态改变监听器
        if (statusChangeListener != null && applicationInfoManager != null) {
            applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
        }

        // 取消之前调度的任务 (包括刷新注册表、发送心跳、实例信息复制器)
        cancelScheduledTasks();

        // If APPINFO was registered
        if (applicationInfoManager != null
            && clientConfig.shouldRegisterWithEureka()
            && clientConfig.shouldUnregisterOnShutdown()) {
            // 设置实例状态为 DOWN
            applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
            // 取消实例的注册
            unregister();
        }

        // 关闭通信客户端
        if (eurekaTransport != null) {
            eurekaTransport.shutdown();
        }

        // 关闭监控相关监视器
        heartbeatStalenessMonitor.shutdown();
        registryStalenessMonitor.shutdown();

        Monitors.unregisterObject(this);

        logger.info("Completed shut down of DiscoveryClient");
    }
}
复制代码

主要做的工作就是将客户端初始化过程做的事情取消和关闭。

其它重要的类

  • com.netflix.discovery.shared.Applications:保存了所有从 eureka-server 拿到的注册表信息;
  • com.netflix.discovery.util.ThresholdLevelsMetric:状态监视器(跟抓取注册表有关);

相关配置

eureka 将配置以接口的方式抽象出来,调用方只需要更改不同的实现类就可以实现不同的加载配置的方式。

这个接口是 com.netflix.discovery.EurekaClientConfig,每一个方法代表一个配置。

放到 spring-cloud 中,这个配置接口的实现类是:

  • org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean: 被 @ConfigurationProperties("eureka.instance") 注解修饰,对应关于实例的配置;
  • org.springframework.cloud.netflix.eureka.EurekaClientConfigBean : 被 @ConfigurationProperties("eureka.client") 注解修饰,对应关于客户端的配置;

先看实例相关的重要配置:

/**
 * actuator的前缀
 */
private String actuatorPrefix = "/actuator";

/**
 * app名称,默认从环境取值 spring.application.name
 */
private String appname = UNKNOWN;

/**
 * 实例用于接收流量的不安全的端口
 */
private int nonSecurePort = 80;

/**
 * 安全通信的端口
 */
private int securePort = 443;

/**
 * 是否启用不安全的端口来接收流量
 */
private boolean nonSecurePortEnabled = true;

/**
 * 是否启用安全的端口
 */
private boolean securePortEnabled;

/**
 * 续约的时间间隔
 */
private int leaseRenewalIntervalInSeconds = 30;

/**
 * 经过多久没有续约,服务端会认为这个实例应该不可用。
 * 这个值至少要比 leaseRenewalIntervalInSeconds 大。
 * 如果这个值过大,则可能会将流量导向一个实际不可用的服务,如果值过小,则可能导致实例因为短暂的网络波动就被剔除了。
 */
private int leaseExpirationDurationInSeconds = 90;

/**
 * 虚拟主机名,默认 spring.application.name
 */
private String virtualHostName = UNKNOWN;

/**
 * 在应用下唯一的实例id,在 spring-cloud 中,这个值默认为由
 * org.springframework.cloud.commons.util.IdUtils#getDefaultInstanceId(resolver) 计算得到
 */
private String instanceId;


/**
 * 实例的键值对信息,作为元数据,其它服务也可以进行访问
 */
private Map<String, String> metadataMap = new HashMap<>();

/**
 * 数据中心信息
 */
private DataCenterInfo dataCenterInfo = new MyDataCenterInfo(
    DataCenterInfo.Name.MyOwn);

/**
 * 客户端的 ip 地址
 */
private String ipAddress;


/**
 * 健康检查的 path
 */
private String healthCheckUrlPath = actuatorPrefix + "/health";

/**
 * 倾向于使用ip而不是hostname
 */
private boolean preferIpAddress = false;

/**
 * 初始的实例状态信息
 */
private InstanceStatus initialStatus = InstanceStatus.UP;
复制代码

再看客户端相关的重要属性:

/**
 * 是否启用客户端
 */
private boolean enabled = true;

/**
 * 通信客户端配置
 */
@NestedConfigurationProperty
private EurekaTransportConfig transport = new CloudEurekaTransportConfig();

/**
 * 获取注册表信息的间隔
 */
private int registryFetchIntervalSeconds = 30;

/**
 * 实例信息复制到服务端的间隔
 */
private int instanceInfoReplicationIntervalSeconds = 30;

/**
 * 初始什么时候开始复制实例信息到服务端
 */
private int initialInstanceInfoReplicationIntervalSeconds = 40;

/**
 * 拉取服务端信息的间隔
 */
private int eurekaServiceUrlPollIntervalSeconds = 5 * MINUTES;

/**
 * 从服务端的读取信息的超时时间
 */
private int eurekaServerReadTimeoutSeconds = 8;

/**
 * 连接服务的超时时间
 */
private int eurekaServerConnectTimeoutSeconds = 5;

/**
 * 备用的降级用的注册表实现,用于从服务端获取注册表失败的情况下
 */
private String backupRegistryImpl;

/**
 * 从客户端到所有的服务端的连接数的总和
 */
private int eurekaServerTotalConnections = 200;


/**
 * 心跳的连接池的任务失败后的下一次任务的调度延迟时间增长所能达到的最大倍数
 */
private int heartbeatExecutorExponentialBackOffBound = 10;


/**
 * 刷新注册表的连接池的任务失败后的下一次任务的调度延迟时间增长所能达到的最大倍数
 */
private int cacheRefreshExecutorExponentialBackOffBound = 10;

/**
 * 一连串的用于与服务端通信的 URL
 */
private Map<String, String> serviceUrl = new HashMap<>();


/**
 * 是否启用 gzip 压缩网络通信数据
 */
private boolean gZipContent = true;

/**
 * 这个实例是否要注册到服务端,以被其它组件发现与使用
 */
private boolean registerWithEureka = true;


/**
 * 是否禁用增量抓取注册表,默认不禁用
 */
private boolean disableDelta;


/**
 * 是否从服务端抓取注册表信息
 */
private boolean fetchRegistry = true;


/**
 * 在实例状态发生变化时,是否强制立即进行实例信息复制
 */
private boolean onDemandUpdateStatusChange = true;


/**
 * 客户端是否在关闭时,显式的从服务端下线
 */
private boolean shouldUnregisterOnShutdown = true;

/**
 * 客户端是否要强制在初始化的时候注册
 */
private boolean shouldEnforceRegistrationAtInit = false;
复制代码

总结

整个 eureka-client 负责的工作,已经分析完毕了。其中不少机制都是让我收获颇丰,比如 超时-推迟 机制、限流机制。

值得学习的是:

  • CAS 的用法实例;
  • 任务的调度方式;
  • 限流机制的实现;
  • 扩展性的考虑;
  • 配置的抽象方式;

参考资料

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享