简介
Eureka是Netflix开发的服务注册与发现框架。Spring Cloud集成了Eureka作为其默认的也是推荐的服务注册中心组件,并提供了开箱即用的支持。Eureka包括两个组件:Eureka Client和Eureka Server。
架构
Eureka Server:表示注册中心集群的节点
Eureka Client:表示Eureka客户端
us-east-xx:表示节点所在区域
Application Service:表示服务提供者
Application Client:表示服务调用者
特点
Eureka注册中心保证AP,即可用性和分区容错性,不保证强一致性。因此Eureka可以很好地应对因网络故障导致部分节点失去联系的情况。与Eureka不同,ZooKeeper保证CP,在master节点因为网络故障与其它节点失去联系时,剩余节点会进行较长时间的“选举”,期间zk集群都是不可用的。
注册服务更重要的是可用性,我们可以接受短期内达不到一致性的状况
再来复习一下分布式系统中的CAP原则:
一致性(C):数据一致更新,所有数据变动都是同步的。
可用性(A):只要收到用户请求,就必须能响应。
分区容忍性(P):分布式系统分区之间通信可能失败,不可避免。
Eureka中一些概念
-
Register:服务注册。当Eureka Client向Eureka Server注册时,它提供自身的元数据,比如IP、端口、运行状况指示符等。
-
Renew:服务续约。Eureka Client每隔30s会发送一次心跳来续约,告诉Eureka Server该Client仍然在线。
-
Fetch Registries:获取注册列表信息。Eureka Client从Eureka Server获取注册表信息,并将其缓存在本地。Client使用该信息查找其它服务,从而进行远程调用。该注册表信息定期(30s)更新一次。
-
Cancel:服务下线。Eureka Client在程序关闭时向Eureka服务器发送取消请求。发送请求后,该客户端实例信息将从服务器的实例注册表中删除。该下线请求不会自动完成,需要主动调用以下接口:
DiscoveryManager.getInstance().shutdownComponent(); 复制代码
-
Evict:服务剔除。默认情况下,当Eureka Client连续90s没有向Eureka Server发送服务续约,即心跳,Eureka Server会将该服务实例从注册列表中删除。
-
Peer Node:Eureka Server集群中节点不分主从,完全对等。
Eureka Server源码分析
下面主要涉及到Eureka Server集群中,Server启动、集群节点发现和节点间信息同步功能的实现。
加载方式
建立Eureka时只需要引入此依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
复制代码
(1)在这个包META-INF下面有个spring.factories这个配置文件:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
复制代码
说明Eureka Server是通过autoconfiguration的方式加载的。
在EurekaServerAutoConfiguration
类的注解上出现了条件注解:
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
复制代码
(2)此外,Eureka Server的创建还需要@EnableEurekaServer
注解,该注解的作用是引入EurekaServerMarkerConfiguration
类。在EurekaServerMarkerConfiguration
类中创建了EurekaServerMarkerConfiguration.Marker
类型的bean。
从而,当我们引入spring-cloud-starter-netflix-eureka-server
包,并且加入了@EnableEurekaServer
注解,spring boot就可以自动配置Eureka Server了。
EurekaServerAutoConfiguration
用于往spring容器中添加Eureka Server相关功能bean供组装。
@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
// 仅抽取关键代码片段
// 注册一个名为Eureka Server的namedFeatures
@Bean
public HasFeatures eurekaServerFeature() {
return HasFeatures.namedFeature("Eureka Server",
EurekaServerAutoConfiguration.class);
}
// eureka.server前缀的配置bean
@Configuration
protected static class EurekaServerConfigBeanConfiguration {
@Bean
@ConditionalOnMissingBean
public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
EurekaServerConfigBean server = new EurekaServerConfigBean();
if (clientConfig.shouldRegisterWithEureka()) {
// Set a sensible default if we are supposed to replicate
server.setRegistrySyncRetries(5);
}
return server;
}
}
// 默认开启EurekaController,提供获取Eureka Server信息的接口
@Bean
@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
public EurekaController eurekaController() {
return new EurekaController(this.applicationInfoManager);
}
// 对等Eureka Server实例注册表
@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
ServerCodecs serverCodecs) {
this.eurekaClient.getApplications(); // force initialization
return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
serverCodecs, this.eurekaClient,
this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
// 对等Eureka Server节点集合,即同一个集群中的节点,PeerEurekaNodes中维护了一个对等节点列表
@Bean
@ConditionalOnMissingBean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
ServerCodecs serverCodecs) {
return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
}
/** 当eureka.client.use-dns-for-fetching-service-urls == false,且以下配置之一改变时,更新peers
* eureka.client.availability-zones
* eureka.client.region
* eureka.client.service-url.<zone>
*/
static class RefreshablePeerEurekaNodes extends PeerEurekaNodes
implements ApplicationListener<EnvironmentChangeEvent> {
public RefreshablePeerEurekaNodes(
final PeerAwareInstanceRegistry registry,
final EurekaServerConfig serverConfig,
final EurekaClientConfig clientConfig,
final ServerCodecs serverCodecs,
final ApplicationInfoManager applicationInfoManager) {
super(registry, serverConfig, clientConfig, serverCodecs, applicationInfoManager);
}
@Override
public void onApplicationEvent(final EnvironmentChangeEvent event) {
if (shouldUpdate(event.getKeys())) {
updatePeerEurekaNodes(resolvePeerUrls());
}
}
/*
* Check whether specific properties have changed.
*/
protected boolean shouldUpdate(final Set<String> changedKeys) {
assert changedKeys != null;
// if eureka.client.use-dns-for-fetching-service-urls is true, then
// service-url will not be fetched from environment.
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
return false;
}
if (changedKeys.contains("eureka.client.region")) {
return true;
}
for (final String key : changedKeys) {
// property keys are not expected to be null.
if (key.startsWith("eureka.client.service-url.") ||
key.startsWith("eureka.client.availability-zones.")) {
return true;
}
}
return false;
}
}
// Eureka Server上下文,组装了EurekaServerConfig、PeerAwareInstanceRegistry、PeerEurekaNodes、ApplicationInfoManager、ServerCodecs并提供getter
@Bean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
registry, peerEurekaNodes, this.applicationInfoManager);
}
// EurekaServerBootstrap是spring-cloud和原生eureka的胶水代码,通过此类启动Eureka Server,让Eureka跑在Embedded Tomcat中。此类会在EurekaServerInitializerConfiguration中被调用,进行eureka启动。
@Bean
public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
EurekaServerContext serverContext) {
return new EurekaServerBootstrap(this.applicationInfoManager,
this.eurekaClientConfig, this.eurekaServerConfig, registry,
serverContext);
}
// 注册Jersey过滤器,Eureka采用Jersey框架来实现对外的restFul接口
/**
* Register the Jersey filter
*/
@Bean
public FilterRegistrationBean jerseyFilterRegistration(
javax.ws.rs.core.Application eurekaJerseyApp) {
FilterRegistrationBean bean = new FilterRegistrationBean();
bean.setFilter(new ServletContainer(eurekaJerseyApp));
bean.setOrder(Ordered.LOWEST_PRECEDENCE);
bean.setUrlPatterns(
Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
return bean;
}
// 注册httpTraceFilter
@Bean
public FilterRegistrationBean traceFilterRegistration(
@Qualifier("httpTraceFilter") Filter filter) {
FilterRegistrationBean bean = new FilterRegistrationBean();
bean.setFilter(filter);
bean.setOrder(Ordered.LOWEST_PRECEDENCE - 10);
return bean;
}
}
复制代码
EurekaServerInitializerConfiguration
该类是Eureka Server程序执行入口。isAutoStartup返回ture,Spring容器初始化完成后会自动执行该类的start()方法。start()方法中开启了新线程来执行初始化动作。
@Configuration
public class EurekaServerInitializerConfiguration
implements ServletContextAware, SmartLifecycle, Ordered {
// 仅抽取关键代码片段
// 启动Eureka Server
@Override
public void start() {
new Thread(new Runnable() {
@Override
public void run() {
try {
//TODO: is this class even needed now?
// 初始化上下文
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
// 发布EurekaServer的注册事件
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
// 修改running状态
EurekaServerInitializerConfiguration.this.running = true;
// 发送Eureka Start 事件
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}
}).start();
}
}
复制代码
EurekaServerBootstrap
前面说了,通过调用此类启动Eureka Server,其实EurekaServerBootstrap代码基本从EurekaBootStrap中拷贝的,将ServletContextListener方式启动改为在Spring容器的生命周期中启动。
public class EurekaServerBootstrap {
// 仅抽取关键代码片段
// Eureka Server运转需要的配置信息
protected EurekaServerConfig eurekaServerConfig;
// 向Eureka Server注册和被其它组件发现所需的初始化信息的类
protected ApplicationInfoManager applicationInfoManager;
// Eureka Client向Eureka Server注册实例时需要的配置信息
protected EurekaClientConfig eurekaClientConfig;
// 实例注册表
protected PeerAwareInstanceRegistry registry;
// EurekaServer上下文
protected volatile EurekaServerContext serverContext;
// 在上面的EurekaServerInitializerConfiguration#start()中被调用
public void contextInitialized(ServletContext context) {
try {
// 初始化Eureka环境
initEurekaEnvironment();
// 初始化EurekaServer上下文
initEurekaServerContext();
// 将EurekaServer上下文设置进Servlet上下文
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
// EurekaServerInitializerConfiguration stop()时会调用此方法销毁上下文及环境
public void contextDestroyed(ServletContext context) {
try {
log.info("Shutting down Eureka Server..");
context.removeAttribute(EurekaServerContext.class.getName());
destroyEurekaServerContext();
destroyEurekaEnvironment();
}
catch (Throwable e) {
log.error("Error shutting down eureka", e);
}
log.info("Eureka Service is now shutdown...");
}
// 初始化Eureka环境变量,即从配置文件中读取并设置datacenter和enviroment的配置参数
protected void initEurekaEnvironment() throws Exception {
log.info("Setting the eureka configuration..");
String dataCenter = ConfigurationManager.getConfigInstance()
.getString(EUREKA_DATACENTER);
if (dataCenter == null) {
log.info(
"Eureka data center value eureka.datacenter is not set, defaulting to default");
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
}
else {
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
}
String environment = ConfigurationManager.getConfigInstance()
.getString(EUREKA_ENVIRONMENT);
if (environment == null) {
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
log.info(
"Eureka environment value eureka.environment is not set, defaulting to test");
}
else {
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment);
}
}
// 初始化EurekaServer上下文
protected void initEurekaServerContext() throws Exception {
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
if (isAws(this.applicationInfoManager.getInfo())) {
this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
this.eurekaClientConfig, this.registry, this.applicationInfoManager);
this.awsBinder.start();
}
// 以上是AWS的相关逻辑
// EurekaServerContextHolder是一个EurekaServer上下文的静态持有者,单例。这里初始化EurekaServerContextHolder,即将EurekaServerContext赋给这个Holder。
EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
// Copy registry from neighboring eureka node
// 从Eureka Server集群中的相邻节点复制全部注册信息
int registryCount = this.registry.syncUp();
// 将状态置为UP,即允许该区域下的服务向其注册
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// Register all monitoring statistics.
// 注册所有监控统计信息
EurekaMonitors.registerAllStats();
}
}
复制代码
PeerAwareInstanceRegistryImpl
Eureka Server启动时,EurekaServerBootstrap调用PeerAwareInstanceRegistryImpl#syncUp(),从相邻节点同步服务注册信息。
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
...
// 从相邻DS节点复制完整的服务信息,并注册到自己的注册表里
/**
* Populates the registry information from a peer eureka node. This
* operation fails over to other nodes until the list is exhausted if the
* communication fails.
*/
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
// 最大同步重试次数 serverConfig.getRegistrySyncRetries()
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
// 同步失败后的重试间隔 serverConfig.getRegistrySyncRetryWaitMs()
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
// 首先从EurekaClient#getApplications()获取所有注册的服务信息
// 这里可以看到所有注册信息封装在Applications里,Applications里包含了所有服务;Application封装了一个服务的注册信息,Application里包含了该服务下的所有实例;InstanceInfo封装了实例信息。
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
// 注册服务实例
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
// 设置本实例状态,指示是否准备好接收通信,并通知其它监听者状态改变事件
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
this.expectedNumberOfRenewsPerMin = count * 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
// postInit中开启了节点剔除的定时任务
super.postInit();
}
}
复制代码
EurekaClient
在上面的syncUp方法中调用了eurekaClient.getApplications(),那么EurekaClient是在哪里注入的呢?
spring-cloud-starter-netflix-eureka-server
中间接引入了spring-cloud-netflix-eureka-client
,该jar包下的spring.factories里引入了自动配置类。一系列条件下来,配置了CloudEurekaClient
这个bean,CloudEurekaClient
是EurekaClient
的子类。
至于eurekaClient里的数据是何时写入的这里先不管,我们可以猜想每个Eureka Server其实也有一个Eureka Client,通过内置的Eureka Client来向其它Eureka Server节点获取服务信息。
PeerEurekaNodes
对等Eureka Server节点集合,这个类是对Eureka Server集群节点的抽象。启动后定时维护当前节点的所有相邻节点信息,PeerEurekaNode
是一个节点的抽象。
EurekaServerBootstrap
中注入了EurekaServerContext
。在EurekaServerAutoConfiguration
中定义了EurekaServerContext
类型的bean——其实现类DefaultEurekaServerContext
。DefaultEurekaServerContext
的@PostConstruct
方法中调用了peerEurekaNodes.start()
。
public class PeerEurekaNodes {
// 仅抽取关键代码片段
protected final PeerAwareInstanceRegistry registry;
// 集群节点集合
private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList();
// 集群节点url集合
private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet();
// 定时任务线程池
private ScheduledExecutorService taskExecutor;
// 启动定时任务线程来updatePeerEurekaNodes
public void start() {
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
// 更新节点信息
updatePeerEurekaNodes(resolvePeerUrls());
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: {}", node.getServiceUrl());
}
}
// 关闭集群
public void shutdown() {
taskExecutor.shutdown();
List<PeerEurekaNode> toRemove = this.peerEurekaNodes;
// 清空节点及URL列表
this.peerEurekaNodes = Collections.emptyList();
this.peerEurekaNodeUrls = Collections.emptySet();
// 关闭每个节点
for (PeerEurekaNode node : toRemove) {
node.shutDown();
}
}
// 获取集群中除去当前节点外所有节点的URL
/**
* Resolve peer URLs.
*
* @return peer URLs with node's own URL filtered out
*/
protected List<String> resolvePeerUrls() {
InstanceInfo myInfo = applicationInfoManager.getInfo();
String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
List<String> replicaUrls = EndpointUtils
.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
int idx = 0;
while (idx < replicaUrls.size()) {
if (isThisMyUrl(replicaUrls.get(idx))) {
replicaUrls.remove(idx);
} else {
idx++;
}
}
return replicaUrls;
}
/**
* Given new set of replica URLs, destroy {@link PeerEurekaNode}s no longer available, and
* create new ones.
*
* @param newPeerUrls peer node URLs; this collection should have local node's URL filtered out
*/
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
if (newPeerUrls.isEmpty()) {
logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
return;
}
Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
toShutdown.removeAll(newPeerUrls); // 要移除并停止的节点
Set<String> toAdd = new HashSet<>(newPeerUrls);
toAdd.removeAll(peerEurekaNodeUrls); // 要新增的节点
// 没有要移除或新增的URL直接返回
if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
return;
}
// Remove peers no long available
List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
// 移除并停止
if (!toShutdown.isEmpty()) {
logger.info("Removing no longer available peer nodes {}", toShutdown);
int i = 0;
while (i < newNodeList.size()) {
PeerEurekaNode eurekaNode = newNodeList.get(i);
if (toShutdown.contains(eurekaNode.getServiceUrl())) {
newNodeList.remove(i);
eurekaNode.shutDown();
} else {
i++;
}
}
}
// 增加新节点
// Add new peers
if (!toAdd.isEmpty()) {
logger.info("Adding new peer nodes {}", toAdd);
for (String peerUrl : toAdd) {
newNodeList.add(createPeerEurekaNode(peerUrl));
}
}
this.peerEurekaNodes = newNodeList;
this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}
// 根据URL构造PeerEurekaNode
protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
// 创建http客户端,用来与对等节点通信,同步数据
HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
String targetHost = hostFromUrl(peerEurekaNodeUrl);
if (targetHost == null) {
targetHost = "host";
}
// 返回构造的节点
return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
}
}
复制代码
PeerEurekaNode
从上面发现Eureka用PeerEurekaNode表示对等节点,使用http客户端在节点之间通信。接下来通过查看PeerEurekaNode来看集群节点之间如何同步信息。
public class PeerEurekaNode {
// 仅抽取关键代码片段
// 批量任务分配器
private final TaskDispatcher<String, ReplicationTask> batchingDispatcher;
// 非批量任务分配器
private final TaskDispatcher<String, ReplicationTask> nonBatchingDispatcher;
// 当有新服务注册到当前节点时,通过此方法将服务注册信息同步到peer node
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
// 任务分配器中添加一个注册新服务的同步任务
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
// 当有服务取消(下线)时,通过此方法将信息同步到peer node
public void cancel(final String appName, final String id) throws Exception {
long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
// 添加取消任务
batchingDispatcher.process(
taskId("cancel", appName, id),
new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {
@Override
public EurekaHttpResponse<Void> execute() {
return replicationClient.cancel(appName, id);
}
@Override
public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
super.handleFailure(statusCode, responseEntity);
if (statusCode == 404) {
logger.warn("{}: missing entry.", getTaskName());
}
}
},
expiryTime
);
}
// 心跳同步任务,当前节点有服务发送心跳续租,将信息同步到peer node
public void heartbeat(final String appName, final String id,
final InstanceInfo info, final InstanceStatus overriddenStatus,
boolean primeConnection) throws Throwable {
if (primeConnection) {
// We do not care about the result for priming request.
replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
return;
}
ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
@Override
public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
}
@Override
public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
super.handleFailure(statusCode, responseEntity);
if (statusCode == 404) {
logger.warn("{}: missing entry.", getTaskName());
if (info != null) {
logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}",
getTaskName(), info.getId(), info.getStatus());
register(info);
}
} else if (config.shouldSyncWhenTimestampDiffers()) {
InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
if (peerInstanceInfo != null) {
syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
}
}
}
};
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);
}
}
复制代码
ApplicationResource
此类是服务注册、查询的接口Controller。
addInstance() // 注册指定实例的信息
getInstanceInfo() // 获取指定应用实例信息
getApplication // 获取指定应用信息
复制代码
小结
Eureka Server启动时,会从集群DS节点获取服务的注册信息注册在自己的注册表里,然后将自己的状态置为UP等待本区域的服务过来注册、续租及查询,然后开启一个服务剔除的task定时清除没有在规定时间里向该节点heartBeat的实例。
另外还会启动定时任务来维护相邻节点。
Eureka Client源码分析
加载方式
上面已经说过,spring-cloud-netflix-eureka-client
通过SpringFactoriesLoader的方式加载。此文件中加载了多个类:
# 自动加载的类
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration
# 从bootstrap.yml中读取配置来初始化上下文
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration
复制代码
问题
(1)虽然Eureka可以部署集群架构,但是集群中每个Eureka实例都是对等的。每个Eureka实例都包含了全部的服务注册表,每个Eureka实例接收到了服务注册/下线等请求的时候,会同步转发给集群中其它的Eureka实例,实现集群数据的同步。如果是超大规模的服务集群,最后服务注册表有可能超过单机内存支撑的极限。
(2)2018年7月,Netflix官方宣布Eureka 2.0停止维护。