因为篇幅问题,这篇是上一篇的续集部分,包含自我保护机制与对等复制相关内容,上一篇地址
自我保护机制
自我保护机制用于感知自身网络不正常的情况。
在 evict
方法的开始,就说明了,如果条件 !isLeaseExpirationEnabled()
为真,就跳过这次调用。那看看这个方法:
@Override
public boolean isLeaseExpirationEnabled() {
// 如果服务端配置不允许自我保护机制,就永远返回 true
if (!isSelfPreservationModeEnabled()) {
// The self preservation mode is disabled, hence allowing the instances to expire.
return true;
}
// 获取上一分钟续约的数量,如果不大于应该续约的阈值,注册中心就会认为自己遭受到网络分区了
// numberOfRenewsPerMinThreshold 是
return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}
复制代码
看完这个方式,我们可能会有两个疑问:
getNumOfRenewsInLastMin()
:如何计算上一分钟的续约数量的;numberOfRenewsPerMinThreshold
:从哪来的;
我们先看第一个问题,如何计算上一份中的续约数量:
方法如下:
private final MeasuredRate renewsLastMin;
@Override
public long getNumOfRenewsInLastMin() {
return renewsLastMin.getCount();
}
复制代码
这里就涉及到 MeasuredRate
这个类了,我们分析这个类看看做了什么:
/**
* Utility class for getting a count in last X milliseconds.
*
* @author Karthik Ranganathan,Greg Kim
*/
public class MeasuredRate {
private static final Logger logger = LoggerFactory.getLogger(MeasuredRate.class);
// 代表上一段时间的bucket,记录上一段时间的数量
private final AtomicLong lastBucket = new AtomicLong(0);
// 代表当前正在计算的时间段的bucket中的数量;
private final AtomicLong currentBucket = new AtomicLong(0);
// 代表每个桶的采样持续时长(间隔)
private final long sampleInterval;
// 计时器,用于切换桶
private final Timer timer;
// 是否启动
private volatile boolean isActive;
/**
* @param sampleInterval in milliseconds
*/
public MeasuredRate(long sampleInterval) {
this.sampleInterval = sampleInterval;
this.timer = new Timer("Eureka-MeasureRateTimer", true);
this.isActive = false;
}
public synchronized void start() {
if (!isActive) {
// 启动定时任务
timer.schedule(new TimerTask() {
@Override
public void run() {
try {
// 获取当前桶的值,并将当前桶的值设置为0,
// 并把当前桶的值设置给上一个桶
lastBucket.set(currentBucket.getAndSet(0));
} catch (Throwable e) {
logger.error("Cannot reset the Measured Rate", e);
}
}
}, sampleInterval, sampleInterval);
isActive = true;
}
}
public synchronized void stop() {
if (isActive) {
// 取消定时任务
timer.cancel();
isActive = false;
}
}
/**
* Returns the count in the last sample interval.
*/
public long getCount() {
// 获取上个区间的计数
return lastBucket.get();
}
/**
* Increments the count in the current sample interval.
*/
public void increment() {
// 为当前区间自增计数
currentBucket.incrementAndGet();
}
}
复制代码
简单来说,这个类就是每隔一段时间,将当前桶的值保存到上一个桶中,当前桶置为0之后,为新的一个时段通过调用 increment
增加计数。
通过点击 com.netflix.eureka.util.MeasuredRate#increment
寻找调用的位置,
可以看到在 com.netflix.eureka.registry.AbstractInstanceRegistry#renew
方法中调用了。
第二个问题,numberOfRenewsPerMinThreshold
是怎么配置的?
通过查看这个属性的用处,可以看到在方法
com.netflix.eureka.registry.AbstractInstanceRegistry#updateRenewsPerMinThreshold
中:
protected void updateRenewsPerMinThreshold() {
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
* serverConfig.getRenewalPercentThreshold());
}
复制代码
expectedNumberOfClientsSendingRenews
记录的是期待进行续约的客户端(服务实例)的数量。
serverConfig.getExpectedClientRenewalIntervalSeconds()
指的是期待的客户端续约间隔(秒)。
serverConfig.getRenewalPercentThreshold()
是续约的百分比阈值。
例如,服务实例数量10个,续约间隔为30秒(默认值),阈值为0.85(默认值)的情况下:
# 向下取整
numberOfRenewsPerMinThreshold = 10 * (60 / 30) * 0.85
复制代码
可以得出每分钟的续约阈值为 17。
总结:自我保护机制通过计算续约数量感知自身是否处于网络分区。
对等复制
多个 EurekaServer
之间是对等的关系,只剩一个节点,也依然可以提供服务。
对等复制功能是为了在多个 EurekaServer
之间同步数据,目的是多个 EurekaServer
能够拥有一致的数据,能独立的提供服务。对等复制涉及到复杂的网络关系,如果设计不好,会碰到两个问题:1. 网络中大量的请求;2. 数据冲突;
对等复制的处理涉及两个端:复制的发起端和复制的接收端。
复制的发起端
复制发起端的实现,在于 PeerAwareInstanceRegistryImpl
中,这里对相关的请求处理进行了重写,例如 renew
:
public boolean renew(final String appName, final String id, final boolean isReplication) {
// 如果父类成功处理请求
if (super.renew(appName, id, isReplication)) {
// 处理对等复制
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
复制代码
在 replicateToPeers
方法中:
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
// 统计时间的
Stopwatch tracer = action.getTimer().start();
try {
// 如果是复制,统计的计数器自增
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// 如果没有对等节点,并且已经是复制的请求了,直接return进行跳过
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
// 遍历所有的节点
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// 如果 node 代表自己,则跳过,不用复制给自己
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
// 处理复制操作
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
复制代码
在 replicateInstanceActionsToPeers
方法中:
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry;
CurrentRequestVersion.set(Version.V2);
// 根据要复制的不同操作,选择不同的处理逻辑
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
// 从注册表中获取最新的信息
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
} finally {
CurrentRequestVersion.remove();
}
}
复制代码
以注册的复制为例,node.register(info)
的调用是将信息封装成任务提交到任务分发器中,之后分批执行。
public void register(final InstanceInfo info) throws Exception {
// 这个信息的过期时间
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
// 提交任务,taskId用于区分同一类事件,用于去除旧的同类事件,避免无效复制
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
// 调用客户端注册
return replicationClient.register(info);
}
},
expiryTime
);
}
复制代码
对等复制之后的分批执行涉及的类为:
com.netflix.eureka.cluster.ReplicationTask
:表示一次复制任务,具体的任务类型由实现类决定,且其中实现了处理错误的方法,用于对服务端返回的特殊响应 (404状态码) 进行一些处理com.netflix.eureka.util.batcher.TaskDispatchers
:创建TaskDispatcher
的匿名内部类,并返回实例com.netflix.eureka.util.batcher.TaskDispatcher
:将复制任务投递到AcceptorExecutor
实例中com.netflix.eureka.cluster.HttpReplicationClient
:用于发起复制请求的通信客户端com.netflix.eureka.util.batcher.TrafficShaper
:流量整形,可以根据之前的客户端请求结果(拥塞或网络错误),调整下次请求的时间。com.netflix.eureka.util.batcher.AcceptorExecutor
:内部有一个线程,通过死循环根据TrafficShaper
调整调度时间分发任务。com.netflix.eureka.util.batcher.TaskExecutors
:用于执行任务,持有一定数量的工作线程,使用com.netflix.eureka.util.batcher.AcceptorExecutor#requestWorkItems
方法从AcceptorExecutor
实例中请求任务以执行,在处理失败后,通过com.netflix.eureka.util.batcher.AcceptorExecutor#reprocess(com.netflix.eureka.util.batcher.TaskHolder<ID,T>, com.netflix.eureka.util.batcher.TaskProcessor.ProcessingResult)
调用进行重新处理和注册错误到TrafficShaper
中com.netflix.eureka.cluster.protocol.ReplicationList
:之前提交到TaskDispatcher
的多个ReplicationTask
被com.netflix.eureka.cluster.ReplicationTaskProcessor#createReplicationListOf
方法合并成了一个批量任务,这个批量任务作为请求体用于发起一次批量请求
以上机制中,批量任务和 TrafficShaper
处理网络通信的复杂问题,减少了流量,处理拥塞和网络问题。
而 com.netflix.eureka.resources.InstanceResource#validateDirtyTimestamp
这个方法返回的状态码是给发起复制的客户端根据状态码处理数据冲突的方式。
复制的接收端
回顾之前相关的注册、续约、下线等逻辑,可以看到,都有 isReplication
值表示这次的请求是否是复制请求。这些请求处理时,都对这个变量进行了一些处理,主要是值不同,涉及的统计数据维护不同。重要的是,如果是复制请求,则 PeerAwareInstanceRegistryImpl
不会再将这次请求复制到其他对等节点(防止重复的复制)。
关于批量请求的接收在于 com.netflix.eureka.resources.PeerReplicationResource#batchReplication
中,其中会根据任务的类型分别调用单个请求的处理方法来处理。
com.netflix.eureka.resources.InstanceResource#validateDirtyTimestamp
尝试给冲突的数据,返回提示性的状态码。
总结及参考资料还请移步前篇