Eureka服务注册与发现

简介

Eureka是Netflix开发的服务注册与发现框架。Spring Cloud集成了Eureka作为其默认的也是推荐的服务注册中心组件,并提供了开箱即用的支持。Eureka包括两个组件:Eureka Client和Eureka Server。

架构

Eureka Server:表示注册中心集群的节点

Eureka Client:表示Eureka客户端

us-east-xx:表示节点所在区域

Application Service:表示服务提供者

Application Client:表示服务调用者

特点

Eureka注册中心保证AP,即可用性和分区容错性,不保证强一致性。因此Eureka可以很好地应对因网络故障导致部分节点失去联系的情况。与Eureka不同,ZooKeeper保证CP,在master节点因为网络故障与其它节点失去联系时,剩余节点会进行较长时间的“选举”,期间zk集群都是不可用的。

注册服务更重要的是可用性,我们可以接受短期内达不到一致性的状况

再来复习一下分布式系统中的CAP原则:

一致性(C):数据一致更新,所有数据变动都是同步的。

可用性(A):只要收到用户请求,就必须能响应。

分区容忍性(P):分布式系统分区之间通信可能失败,不可避免。

Eureka中一些概念

  • Register:服务注册。当Eureka Client向Eureka Server注册时,它提供自身的元数据,比如IP、端口、运行状况指示符等。

  • Renew:服务续约。Eureka Client每隔30s会发送一次心跳来续约,告诉Eureka Server该Client仍然在线。

  • Fetch Registries:获取注册列表信息。Eureka Client从Eureka Server获取注册表信息,并将其缓存在本地。Client使用该信息查找其它服务,从而进行远程调用。该注册表信息定期(30s)更新一次。

  • Cancel:服务下线。Eureka Client在程序关闭时向Eureka服务器发送取消请求。发送请求后,该客户端实例信息将从服务器的实例注册表中删除。该下线请求不会自动完成,需要主动调用以下接口:

    DiscoveryManager.getInstance().shutdownComponent();
    复制代码
  • Evict:服务剔除。默认情况下,当Eureka Client连续90s没有向Eureka Server发送服务续约,即心跳,Eureka Server会将该服务实例从注册列表中删除。

  • Peer Node:Eureka Server集群中节点不分主从,完全对等。

Eureka Server源码分析

下面主要涉及到Eureka Server集群中,Server启动、集群节点发现和节点间信息同步功能的实现。

加载方式

建立Eureka时只需要引入此依赖:

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
        </dependency>
复制代码

(1)在这个包META-INF下面有个spring.factories这个配置文件:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
复制代码

说明Eureka Server是通过autoconfiguration的方式加载的。

EurekaServerAutoConfiguration类的注解上出现了条件注解:

@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
复制代码

(2)此外,Eureka Server的创建还需要@EnableEurekaServer注解,该注解的作用是引入EurekaServerMarkerConfiguration类。在EurekaServerMarkerConfiguration类中创建了EurekaServerMarkerConfiguration.Marker类型的bean。

从而,当我们引入spring-cloud-starter-netflix-eureka-server包,并且加入了@EnableEurekaServer注解,spring boot就可以自动配置Eureka Server了。

EurekaServerAutoConfiguration

用于往spring容器中添加Eureka Server相关功能bean供组装。

@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
		InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {

  	// 仅抽取关键代码片段

  	// 注册一个名为Eureka Server的namedFeatures
	@Bean
	public HasFeatures eurekaServerFeature() {
		return HasFeatures.namedFeature("Eureka Server",
				EurekaServerAutoConfiguration.class);
	}

  	// eureka.server前缀的配置bean
	@Configuration
	protected static class EurekaServerConfigBeanConfiguration {
		@Bean
		@ConditionalOnMissingBean
		public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
			EurekaServerConfigBean server = new EurekaServerConfigBean();
			if (clientConfig.shouldRegisterWithEureka()) {
				// Set a sensible default if we are supposed to replicate
				server.setRegistrySyncRetries(5);
			}
			return server;
		}
	}

  	// 默认开启EurekaController,提供获取Eureka Server信息的接口
	@Bean
	@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
	public EurekaController eurekaController() {
		return new EurekaController(this.applicationInfoManager);
	}

  	// 对等Eureka Server实例注册表
	@Bean
	public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
			ServerCodecs serverCodecs) {
		this.eurekaClient.getApplications(); // force initialization
		return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
				serverCodecs, this.eurekaClient,
				this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
				this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
	}

  	// 对等Eureka Server节点集合,即同一个集群中的节点,PeerEurekaNodes中维护了一个对等节点列表
	@Bean
	@ConditionalOnMissingBean
	public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
			ServerCodecs serverCodecs) {
		return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
				this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
	}
	
  	/** 当eureka.client.use-dns-for-fetching-service-urls == false,且以下配置之一改变时,更新peers
  	 *  eureka.client.availability-zones
  	 *  eureka.client.region
  	 *  eureka.client.service-url.<zone>
  	 */
	static class RefreshablePeerEurekaNodes extends PeerEurekaNodes
			implements ApplicationListener<EnvironmentChangeEvent> {

		public RefreshablePeerEurekaNodes(
				final PeerAwareInstanceRegistry registry,
				final EurekaServerConfig serverConfig,
				final EurekaClientConfig clientConfig, 
				final ServerCodecs serverCodecs,
				final ApplicationInfoManager applicationInfoManager) {
			super(registry, serverConfig, clientConfig, serverCodecs, applicationInfoManager);
		}

		@Override
		public void onApplicationEvent(final EnvironmentChangeEvent event) {
			if (shouldUpdate(event.getKeys())) {
				updatePeerEurekaNodes(resolvePeerUrls());
			}
		}
		
		/*
		 * Check whether specific properties have changed.
		 */
		protected boolean shouldUpdate(final Set<String> changedKeys) {
			assert changedKeys != null;
			
			// if eureka.client.use-dns-for-fetching-service-urls is true, then
			// service-url will not be fetched from environment.
			if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
				return false;
			}
			
			if (changedKeys.contains("eureka.client.region")) {
				return true;
			}
			
			for (final String key : changedKeys) {
				// property keys are not expected to be null.
				if (key.startsWith("eureka.client.service-url.") ||
					key.startsWith("eureka.client.availability-zones.")) {
					return true;
				}
			}
			
			return false;
		}
	}

  	// Eureka Server上下文,组装了EurekaServerConfig、PeerAwareInstanceRegistry、PeerEurekaNodes、ApplicationInfoManager、ServerCodecs并提供getter
	@Bean
	public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
			PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
		return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
				registry, peerEurekaNodes, this.applicationInfoManager);
	}

  	// EurekaServerBootstrap是spring-cloud和原生eureka的胶水代码,通过此类启动Eureka Server,让Eureka跑在Embedded Tomcat中。此类会在EurekaServerInitializerConfiguration中被调用,进行eureka启动。
	@Bean
	public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
			EurekaServerContext serverContext) {
		return new EurekaServerBootstrap(this.applicationInfoManager,
				this.eurekaClientConfig, this.eurekaServerConfig, registry,
				serverContext);
	}

  	// 注册Jersey过滤器,Eureka采用Jersey框架来实现对外的restFul接口
	/**
	 * Register the Jersey filter
	 */
	@Bean
	public FilterRegistrationBean jerseyFilterRegistration(
			javax.ws.rs.core.Application eurekaJerseyApp) {
		FilterRegistrationBean bean = new FilterRegistrationBean();
		bean.setFilter(new ServletContainer(eurekaJerseyApp));
		bean.setOrder(Ordered.LOWEST_PRECEDENCE);
		bean.setUrlPatterns(
				Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));

		return bean;
	}

  	// 注册httpTraceFilter
	@Bean
	public FilterRegistrationBean traceFilterRegistration(
			@Qualifier("httpTraceFilter") Filter filter) {
		FilterRegistrationBean bean = new FilterRegistrationBean();
		bean.setFilter(filter);
		bean.setOrder(Ordered.LOWEST_PRECEDENCE - 10);
		return bean;
	}
}
复制代码

EurekaServerInitializerConfiguration

该类是Eureka Server程序执行入口。isAutoStartup返回ture,Spring容器初始化完成后会自动执行该类的start()方法。start()方法中开启了新线程来执行初始化动作。

@Configuration
public class EurekaServerInitializerConfiguration
		implements ServletContextAware, SmartLifecycle, Ordered {

  	// 仅抽取关键代码片段
  
  	// 启动Eureka Server
	@Override
	public void start() {
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					//TODO: is this class even needed now?
					// 初始化上下文
                  	eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
					log.info("Started Eureka Server");

                  	// 发布EurekaServer的注册事件
					publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
                  	// 修改running状态
					EurekaServerInitializerConfiguration.this.running = true;
                  	// 发送Eureka Start 事件
					publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
				}
				catch (Exception ex) {
					// Help!
					log.error("Could not initialize Eureka servlet context", ex);
				}
			}
		}).start();
	}
}
复制代码

EurekaServerBootstrap

前面说了,通过调用此类启动Eureka Server,其实EurekaServerBootstrap代码基本从EurekaBootStrap中拷贝的,将ServletContextListener方式启动改为在Spring容器的生命周期中启动。

public class EurekaServerBootstrap {

  	// 仅抽取关键代码片段

  	// Eureka Server运转需要的配置信息
  	protected EurekaServerConfig eurekaServerConfig;
  	// 向Eureka Server注册和被其它组件发现所需的初始化信息的类
	protected ApplicationInfoManager applicationInfoManager;
  	// Eureka Client向Eureka Server注册实例时需要的配置信息
	protected EurekaClientConfig eurekaClientConfig;
  	// 实例注册表
	protected PeerAwareInstanceRegistry registry;
  	// EurekaServer上下文
	protected volatile EurekaServerContext serverContext;
  
  	// 在上面的EurekaServerInitializerConfiguration#start()中被调用
	public void contextInitialized(ServletContext context) {
		try {
          	// 初始化Eureka环境
			initEurekaEnvironment();
          	// 初始化EurekaServer上下文
			initEurekaServerContext();

          	// 将EurekaServer上下文设置进Servlet上下文
			context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
		}
		catch (Throwable e) {
			log.error("Cannot bootstrap eureka server :", e);
			throw new RuntimeException("Cannot bootstrap eureka server :", e);
		}
	}

  	// EurekaServerInitializerConfiguration stop()时会调用此方法销毁上下文及环境
	public void contextDestroyed(ServletContext context) {
		try {
			log.info("Shutting down Eureka Server..");
			context.removeAttribute(EurekaServerContext.class.getName());

			destroyEurekaServerContext();
			destroyEurekaEnvironment();

		}
		catch (Throwable e) {
			log.error("Error shutting down eureka", e);
		}
		log.info("Eureka Service is now shutdown...");
	}

  	// 初始化Eureka环境变量,即从配置文件中读取并设置datacenter和enviroment的配置参数
	protected void initEurekaEnvironment() throws Exception {
		log.info("Setting the eureka configuration..");

		String dataCenter = ConfigurationManager.getConfigInstance()
				.getString(EUREKA_DATACENTER);
		if (dataCenter == null) {
			log.info(
					"Eureka data center value eureka.datacenter is not set, defaulting to default");
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
		}
		else {
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
		}
		String environment = ConfigurationManager.getConfigInstance()
				.getString(EUREKA_ENVIRONMENT);
		if (environment == null) {
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
			log.info(
					"Eureka environment value eureka.environment is not set, defaulting to test");
		}
		else {
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment);
		}
	}

  	// 初始化EurekaServer上下文
	protected void initEurekaServerContext() throws Exception {
		// For backward compatibility
		JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
				XStream.PRIORITY_VERY_HIGH);
		XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
				XStream.PRIORITY_VERY_HIGH);

		if (isAws(this.applicationInfoManager.getInfo())) {
			this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
					this.eurekaClientConfig, this.registry, this.applicationInfoManager);
			this.awsBinder.start();
		}
      	// 以上是AWS的相关逻辑

      	// EurekaServerContextHolder是一个EurekaServer上下文的静态持有者,单例。这里初始化EurekaServerContextHolder,即将EurekaServerContext赋给这个Holder。
		EurekaServerContextHolder.initialize(this.serverContext);

		log.info("Initialized server context");

		// Copy registry from neighboring eureka node
      	// 从Eureka Server集群中的相邻节点复制全部注册信息
		int registryCount = this.registry.syncUp();
      	// 将状态置为UP,即允许该区域下的服务向其注册
		this.registry.openForTraffic(this.applicationInfoManager, registryCount);

		// Register all monitoring statistics.
      	// 注册所有监控统计信息
		EurekaMonitors.registerAllStats();
	}
}
复制代码

PeerAwareInstanceRegistryImpl

Eureka Server启动时,EurekaServerBootstrap调用PeerAwareInstanceRegistryImpl#syncUp(),从相邻节点同步服务注册信息。

public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
  	...

     // 从相邻DS节点复制完整的服务信息,并注册到自己的注册表里
	/**
     * Populates the registry information from a peer eureka node. This
     * operation fails over to other nodes until the list is exhausted if the
     * communication fails.
     */
    @Override
    public int syncUp() {
        // Copy entire entry from neighboring DS node
        int count = 0;

      	// 最大同步重试次数 serverConfig.getRegistrySyncRetries()
        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
                try {
                  	// 同步失败后的重试间隔 serverConfig.getRegistrySyncRetryWaitMs()
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
          	// 首先从EurekaClient#getApplications()获取所有注册的服务信息
          	// 这里可以看到所有注册信息封装在Applications里,Applications里包含了所有服务;Application封装了一个服务的注册信息,Application里包含了该服务下的所有实例;InstanceInfo封装了实例信息。
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                        if (isRegisterable(instance)) {
                          	// 注册服务实例
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }

  	// 设置本实例状态,指示是否准备好接收通信,并通知其它监听者状态改变事件
    @Override
    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
        this.expectedNumberOfRenewsPerMin = count * 2;
        this.numberOfRenewsPerMinThreshold =
                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
        logger.info("Got {} instances from neighboring DS node", count);
        logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
        this.startupTime = System.currentTimeMillis();
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }
        DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
        boolean isAws = Name.Amazon == selfName;
        if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
            logger.info("Priming AWS connections for all replicas..");
            primeAwsReplicas(applicationInfoManager);
        }
        logger.info("Changing status to UP");
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
      	// postInit中开启了节点剔除的定时任务
        super.postInit();
    }
}
复制代码

EurekaClient

在上面的syncUp方法中调用了eurekaClient.getApplications(),那么EurekaClient是在哪里注入的呢?

spring-cloud-starter-netflix-eureka-server中间接引入了spring-cloud-netflix-eureka-client,该jar包下的spring.factories里引入了自动配置类。一系列条件下来,配置了CloudEurekaClient 这个bean,CloudEurekaClientEurekaClient的子类。

至于eurekaClient里的数据是何时写入的这里先不管,我们可以猜想每个Eureka Server其实也有一个Eureka Client,通过内置的Eureka Client来向其它Eureka Server节点获取服务信息。

PeerEurekaNodes

对等Eureka Server节点集合,这个类是对Eureka Server集群节点的抽象。启动后定时维护当前节点的所有相邻节点信息,PeerEurekaNode是一个节点的抽象。

EurekaServerBootstrap中注入了EurekaServerContext。在EurekaServerAutoConfiguration中定义了EurekaServerContext类型的bean——其实现类DefaultEurekaServerContextDefaultEurekaServerContext@PostConstruct方法中调用了peerEurekaNodes.start()

public class PeerEurekaNodes {

  	// 仅抽取关键代码片段

    protected final PeerAwareInstanceRegistry registry;
  	// 集群节点集合
    private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList();
  	// 集群节点url集合
    private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet();
  	// 定时任务线程池
    private ScheduledExecutorService taskExecutor;

  	// 启动定时任务线程来updatePeerEurekaNodes
    public void start() {
        taskExecutor = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                        thread.setDaemon(true);
                        return thread;
                    }
                }
        );
        try {
          	// 更新节点信息
            updatePeerEurekaNodes(resolvePeerUrls());
            Runnable peersUpdateTask = new Runnable() {
                @Override
                public void run() {
                    try {
                        updatePeerEurekaNodes(resolvePeerUrls());
                    } catch (Throwable e) {
                        logger.error("Cannot update the replica Nodes", e);
                    }

                }
            };
            taskExecutor.scheduleWithFixedDelay(
                    peersUpdateTask,
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    TimeUnit.MILLISECONDS
            );
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        for (PeerEurekaNode node : peerEurekaNodes) {
            logger.info("Replica node URL:  {}", node.getServiceUrl());
        }
    }

  	// 关闭集群
    public void shutdown() {
        taskExecutor.shutdown();
        List<PeerEurekaNode> toRemove = this.peerEurekaNodes;

      	// 清空节点及URL列表
        this.peerEurekaNodes = Collections.emptyList();
        this.peerEurekaNodeUrls = Collections.emptySet();

      	// 关闭每个节点
        for (PeerEurekaNode node : toRemove) {
            node.shutDown();
        }
    }

  	// 获取集群中除去当前节点外所有节点的URL
    /**
     * Resolve peer URLs.
     *
     * @return peer URLs with node's own URL filtered out
     */
    protected List<String> resolvePeerUrls() {
        InstanceInfo myInfo = applicationInfoManager.getInfo();
        String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
        List<String> replicaUrls = EndpointUtils
                .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));

        int idx = 0;
        while (idx < replicaUrls.size()) {
            if (isThisMyUrl(replicaUrls.get(idx))) {
                replicaUrls.remove(idx);
            } else {
                idx++;
            }
        }
        return replicaUrls;
    }

    /**
     * Given new set of replica URLs, destroy {@link PeerEurekaNode}s no longer available, and
     * create new ones.
     *
     * @param newPeerUrls peer node URLs; this collection should have local node's URL filtered out
     */
    protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
        if (newPeerUrls.isEmpty()) {
            logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
            return;
        }

        Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
        toShutdown.removeAll(newPeerUrls); // 要移除并停止的节点
        Set<String> toAdd = new HashSet<>(newPeerUrls);
        toAdd.removeAll(peerEurekaNodeUrls); // 要新增的节点

      	// 没有要移除或新增的URL直接返回
        if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
            return;
        }

        // Remove peers no long available
        List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);

      	// 移除并停止
        if (!toShutdown.isEmpty()) {
            logger.info("Removing no longer available peer nodes {}", toShutdown);
            int i = 0;
            while (i < newNodeList.size()) {
                PeerEurekaNode eurekaNode = newNodeList.get(i);
                if (toShutdown.contains(eurekaNode.getServiceUrl())) {
                    newNodeList.remove(i);
                    eurekaNode.shutDown();
                } else {
                    i++;
                }
            }
        }

      	// 增加新节点
        // Add new peers
        if (!toAdd.isEmpty()) {
            logger.info("Adding new peer nodes {}", toAdd);
            for (String peerUrl : toAdd) {
                newNodeList.add(createPeerEurekaNode(peerUrl));
            }
        }

        this.peerEurekaNodes = newNodeList;
        this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
    }

  	// 根据URL构造PeerEurekaNode
    protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
      	// 创建http客户端,用来与对等节点通信,同步数据
        HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
        String targetHost = hostFromUrl(peerEurekaNodeUrl);
        if (targetHost == null) {
            targetHost = "host";
        }
      	// 返回构造的节点
        return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
    }
}
复制代码

PeerEurekaNode

从上面发现Eureka用PeerEurekaNode表示对等节点,使用http客户端在节点之间通信。接下来通过查看PeerEurekaNode来看集群节点之间如何同步信息。

public class PeerEurekaNode {

  	// 仅抽取关键代码片段

  	// 批量任务分配器
	private final TaskDispatcher<String, ReplicationTask> batchingDispatcher;
  	// 非批量任务分配器
	private final TaskDispatcher<String, ReplicationTask> nonBatchingDispatcher;

  	// 当有新服务注册到当前节点时,通过此方法将服务注册信息同步到peer node
    public void register(final InstanceInfo info) throws Exception {
        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
      	// 任务分配器中添加一个注册新服务的同步任务
        batchingDispatcher.process(
                taskId("register", info),
                new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                    public EurekaHttpResponse<Void> execute() {
                        return replicationClient.register(info);
                    }
                },
                expiryTime
        );
    }

  	// 当有服务取消(下线)时,通过此方法将信息同步到peer node
    public void cancel(final String appName, final String id) throws Exception {
        long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
      	// 添加取消任务
        batchingDispatcher.process(
                taskId("cancel", appName, id),
                new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {
                    @Override
                    public EurekaHttpResponse<Void> execute() {
                        return replicationClient.cancel(appName, id);
                    }

                    @Override
                    public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
                        super.handleFailure(statusCode, responseEntity);
                        if (statusCode == 404) {
                            logger.warn("{}: missing entry.", getTaskName());
                        }
                    }
                },
                expiryTime
        );
    }

    // 心跳同步任务,当前节点有服务发送心跳续租,将信息同步到peer node
    public void heartbeat(final String appName, final String id,
                          final InstanceInfo info, final InstanceStatus overriddenStatus,
                          boolean primeConnection) throws Throwable {
        if (primeConnection) {
            // We do not care about the result for priming request.
            replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
            return;
        }
        ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
            @Override
            public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
                return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
            }

            @Override
            public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
                super.handleFailure(statusCode, responseEntity);
                if (statusCode == 404) {
                    logger.warn("{}: missing entry.", getTaskName());
                    if (info != null) {
                        logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}",
                                getTaskName(), info.getId(), info.getStatus());
                        register(info);
                    }
                } else if (config.shouldSyncWhenTimestampDiffers()) {
                    InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
                    if (peerInstanceInfo != null) {
                        syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
                    }
                }
            }
        };
        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
        batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);
    }
}
复制代码

ApplicationResource

此类是服务注册、查询的接口Controller。

addInstance() // 注册指定实例的信息
getInstanceInfo() // 获取指定应用实例信息
getApplication // 获取指定应用信息
复制代码

小结

Eureka Server启动时,会从集群DS节点获取服务的注册信息注册在自己的注册表里,然后将自己的状态置为UP等待本区域的服务过来注册、续租及查询,然后开启一个服务剔除的task定时清除没有在规定时间里向该节点heartBeat的实例。

另外还会启动定时任务来维护相邻节点。

Eureka Client源码分析

加载方式

上面已经说过,spring-cloud-netflix-eureka-client通过SpringFactoriesLoader的方式加载。此文件中加载了多个类:

# 自动加载的类
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration
# 从bootstrap.yml中读取配置来初始化上下文
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration
复制代码

问题

(1)虽然Eureka可以部署集群架构,但是集群中每个Eureka实例都是对等的。每个Eureka实例都包含了全部的服务注册表,每个Eureka实例接收到了服务注册/下线等请求的时候,会同步转发给集群中其它的Eureka实例,实现集群数据的同步。如果是超大规模的服务集群,最后服务注册表有可能超过单机内存支撑的极限。

(2)2018年7月,Netflix官方宣布Eureka 2.0停止维护。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享