【Spring Cloud】Eureka-Server 源码解读|周末学习

本文已参与周末学习计划,点击链接查看详情 链接

一、前言

看源码:抓大放小,先主流程,再细枝末节。

用技巧连蒙带猜:

  1. 看方法名(英文名)
  2. 看注解

eureka 运行的核心的流程:

  1. 服务注册:eureka clienteureka server 注册的过程
  2. 服务发现:eureka clienteureka server 获取注册表的过程
  3. 服务心跳:eureka client 定时往 eureka server 发送续约通知(心跳)
  4. 服务实例摘除
  5. 通信
  6. 限流
  7. 自我保护
  8. server 集群:eureka-server 之间相互注册

eureka server :提供注册中心的功能。

前提需知:eureka-server 是一个 web 应用,可以打成 war包,在 tomcat 里启动。

(1)源码着手点

源码着手点可分为两点:

  1. eureka-serverbuild.gradle:各种依赖和构建所需的配置

  2. eureka-serverweb.xmlweb 应用最核心之处,定义各种 listenerfilter

1)从 build.gradle 可看出

文件路径:eureka-server/build.gradle

可以看到,依赖的模块有:

  1. eureka-client:在集群模式中,每个 server 也是一个 client,可以互相注册。

  2. eureka-core:注册中心的核心角色,接收服务注册请求,提供服务发现的功能,保持心跳(续约请求),摘除故障服务实例。

  3. jerseyRESTful HTTP 框架,eureka clienteureka server 之间进行通信,都是基于 jersey

    例如:eureka-client-jersey2eureka-core-jersey2

  4. 等等,其他可从 build.gradle 查阅

2)从 web.xml 可看出

文件路径:eureka-server/src/main/webapp/WEB-INF/web.xml

web.xml 可看出:

  1. listener 监听器: 负责 web 应用初始化; 例如,启动后台线程去加载配置文件

    对应 eureka-server 里,就是 com.netflix.eureka.EurekaBootStrap

  2. filter 过滤器:对请求进行处理

核心的 filter 有四个:

当然,每个 filterweb.xml 也定义了相对应的 URL 匹配。

  • StatusFilter:负责状态相关的处理逻辑

  • ServerRequestAuthFilter:授权认证相关的处理逻辑

  • RateLimitingFilter:负责限流相关的处理逻辑

  • GzipEncodingEnforcingFiltergzip,压缩相关的

最后一个是 jerseyServletContainer: 接收所有的请求,作为请求的入口

<filter>
    <filter-name>jersey</filter-name>
    <filter-class>com.sun.jersey.spi.container.servlet.ServletContainer</filter-class>
    ... ...
</filter>
复制代码

(2)一些概念

读源码准备,Eureka 的一些概念:

  1. Register 服务注册
  2. Renew 服务续约
  3. Fetch Registries 获取服务注册列表信息
  4. Cancel 服务下线
  5. Eviction 服务剔除

二、从源码中学到了什么

凡凡觉得 “学到了” 的东西。

(1)线程池的使用

学习他人怎么使用线程池,方便以后自己写架构时候,拿来就用。

源码中线程池有:

  1. 调度线程池

  2. 心跳线程池

  3. 缓存刷新线程池

  4. 调度线程池

    private final ScheduledExecutorService scheduler;
    // 支持调度的线程池,核心线程数 2个,使用 google 工具包中线程工厂,后台线程
    scheduler = Executors.newScheduledThreadPool(
        2, new ThreadFactoryBuilder()
        .setNameFormat("DiscoveryClient-%d")
        .setDaemon(true)
        .build());
    复制代码
  5. 心跳线程池

    private final ThreadPoolExecutor heartbeatExecutor;
    // 支持心跳的线程池: 核心线程数1个,默认最大线程数5个,且线程存活时间为0,后台线程
    heartbeatExecutor = new ThreadPoolExecutor(
        1, 5, 0, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>(),
        new ThreadFactoryBuilder()
        .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
        .setDaemon(true)
        .build()
    复制代码
  6. 缓存刷新线程池

    private final ThreadPoolExecutor cacheRefreshExecutor;
    // 支持缓存刷新的线程池: 核心线程数1个,默认最大线程数5个,且线程存活时间为0,后台线程
    cacheRefreshExecutor = new ThreadPoolExecutor(
        1, 5, 0, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>(),
        new ThreadFactoryBuilder()
        .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
        .setDaemon(true)
        .build()
    ); 
    复制代码

(3)循环队列

com.netflix.eureka.registry.AbstractInstanceRegistry.java 里定义两个循环队列。

// 最近注册的队列
private final CircularQueue<Pair<Long, String>> recentRegisteredQueue;
// 最近取消的队列
private final CircularQueue<Pair<Long, String>> recentCanceledQueue;
复制代码
private class CircularQueue<E> extends ConcurrentLinkedQueue<E> {
    private int size = 0;

    public CircularQueue(int size) {
        this.size = size;
    }

    @Override
    public boolean add(E e) {
        this.makeSpaceIfNotAvailable();
        return super.add(e);
    }

    private void makeSpaceIfNotAvailable() {
        if (this.size() == size) {
            this.remove();
        }
    }

    public boolean offer(E e) {
        this.makeSpaceIfNotAvailable();
        return super.offer(e);
    }
}
复制代码

三、直接怼源码

(1)初始化

eureka-server 初始化时,就会触发这个监听器:

eureka-server/src/main/webapp/WEB-INF/web.xml 定义了

<listener>
    <listener-class>com.netflix.eureka.EurekaBootStrap</listener-class>
</listener>
复制代码

而这个监听器的执行初始化的方法,contextInitialized() 方法,源码如下:

// 定位:eureka-core/src/main/java/com/netflix/eureka/EurekaBootStrap.java
@Override
public void contextInitialized(ServletContextEvent event) {
    try {
        // 1. 初始化环境变量
        initEurekaEnvironment();
        // 2. 初始化上下文
        initEurekaServerContext();

        ServletContext sc = event.getServletContext();
        sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
    } catch (Throwable e) {
        logger.error("Cannot bootstrap eureka server :", e);
        throw new RuntimeException("Cannot bootstrap eureka server :", e);
    }
}
复制代码

目前源码着手点,可分为两点:

  1. 初始化环境变量 initEurekaEnvironment()

  2. 初始化上下文 initEurekaServerContext()

1)初始化环境变量 initEurekaEnvironment()

主要过程步骤如下:

  1. 创建 ConcurrentCompositeConfiguration :代表了所谓的配置,包括了 eureka 需要的所有的配置

    1.1 在创建时候,先调用了 clear() 方法

    1.2 之后,fireEvent() 发布了一个事件(EVENT_CLEAR

  2. ConcurrentCompositeConfiguration config 实例加入了一堆别配置

    2.1 初始化数据中心的配置,如果没有配置的话,就是 DEFAULT data center

    2.2 初始化 eureka 运行的环境,如果没有配置,默认设置为 test 环境

进入方法,查看对应源码:

// 定位:eureka-core/src/main/java/com/netflix/eureka/EurekaBootStrap.java
protected void initEurekaEnvironment() throws Exception {
    logger.info("Setting the eureka configuration..");

    // 1. 重要的是:
    String dataCenter = ConfigurationManager.getConfigInstance().getString(EUREKA_DATACENTER);
    
    // 2. 其他别的配置
    ... ...
}
复制代码

进入 ConfigurationManager.getConfigInstance(),可看到:

// 定位:com.netflix.config.ConfigurationManager.java
public static AbstractConfiguration getConfigInstance() {
    if (instance == null) {
        synchronized (ConfigurationManager.class) {
            if (instance == null) {
                instance = getConfigInstance(Boolean.getBoolean(DynamicPropertyFactory.DISABLE_DEFAULT_CONFIG));
            }
        }
    }
    return instance;
}
复制代码

初始化配置管理器(ConfigurationManager):采用 双检查锁(double check)和volatile单例模式 来实现线程安全。

进入 getConfigInstance() 方法:

// 定位:com.netflix.config.ConfigurationManager.java
private static AbstractConfiguration getConfigInstance(boolean defaultConfigDisabled) {
    if (instance == null && !defaultConfigDisabled) {
        // 1. 创建 ConcurrentCompositeConfiguration
        instance = createDefaultConfigInstance();
        registerConfigBean();
    }
    return instance;
}

private static AbstractConfiguration createDefaultConfigInstance() {
    // 创建主要配置
    ConcurrentCompositeConfiguration config = new ConcurrentCompositeConfiguration();  
    
    // 加载其他配置
    ... ...
        
    return config;
}
// 定位:com.netflix.config.ConcurrentCompositeConfiguration.java
// 1.1. 创建主要配置时,会先清理
public ConcurrentCompositeConfiguration() {
    clear();
}
public final void clear() {
    // 1.2 发布事件
    fireEvent(EVENT_CLEAR, null, null, true);
    
    ... ... 
        
    fireEvent(EVENT_CLEAR, null, null, false);
    ... ...
}
复制代码

2)初始化上下文 initEurekaServerContext()

主要过程步骤如下:

  1. 第一步:加载 eureka-server 文件配置
  2. 第二步:初始化 eureka-server 内部的 eureka-client(用来跟其他的 eureka-server 节点进行注册和通信)
  3. 第三步:处理注册相关的事情
  4. 第四步:处理 peer 相关节点
  5. 第五步:完成 eureka-server 上下文(context)的构建
  6. 第六步:处理一些善后的事情,从相邻的 eureka 节点拷贝注册信息
  7. 第七步:注册所有监控
protected void initEurekaServerContext() throws Exception {
    // 第一步:加载 eureka-server 文件配置
    EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();

    // 第二步:初始化 eureka-server 内部的 eureka-client
    //(跟其他的 eureka-server 节点进行注册和通信)
    ApplicationInfoManager applicationInfoManager = null;

    // 第三步:处理注册相关的事情
    PeerAwareInstanceRegistry registry;

    // 第四步:处理 peer 相关节点
    PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes();

    // 第五步:完成 eureka-server 上下文(context)的构建
    serverContext = new DefaultEurekaServerContext();

    // 第六步:处理一些善后的事情,从相邻的 eureka 节点拷贝注册信息
    int registryCount = registry.syncUp();
    registry.openForTraffic(applicationInfoManager, registryCount);

    // 第七步:注册所有监控
    EurekaMonitors.registerAllStats();
}
复制代码

下面进入主要流程:


第一步:加载 eureka-server 文件配置

加载 eureka-server.properties 的过程:

  1. 创建了一个DefaultEurekaServerConfig 对象,创建时会调用 init() 方法

    DefaultEurekaServerConfig 提供的获取配置项的各个方法,都是通过硬编码的配置项名称,从DynamicPropertyFactory 中获取配置项的值,DynamicPropertyFactory 又是从ConfigurationManager 那儿获取来的,所以也包含了所有配置项的值。

// 1. init()
private void init() {
    // 拿到当前的环境:eureka.environment
    String env = ConfigurationManager.getConfigInstance().getString(
        EUREKA_ENVIRONMENT, TEST);
    // 设置当前属性
    ConfigurationManager.getConfigInstance().setProperty(
        ARCHAIUS_DEPLOYMENT_ENVIRONMENT, env);
    // 加载 eureka 配置文件的名字: eureka-server
    String eurekaPropsFile = EUREKA_PROPS_FILE.get();
    // 2. 加载配置:eureka-server.properties 
    ConfigurationManager.loadCascadedPropertiesFromResources(eurekaPropsFile);
}
复制代码
  1. eureka-server.properties 中的配置加载到了一个 Properties 对象中,然后将 Properties 对象中的配置放到 ConfigurationManager 中去,此时 ConfigurationManager 中去就有了所有的配置了。

    eureka-server.properties默认没有配置,所以读取的都是默认值。

第二步:初始化 eureka-server 内部的 eureka-client
ApplicationInfoManager applicationInfoManager = null;

if (eurekaClient == null) {
    // 1. 将 eureka-client.properties 配置加载到 ConfigurationManager
    EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
        ? new CloudInstanceConfig()
        : new MyDataCenterInstanceConfig();

    // 2. 这期间会构建 InstanceInfo(服务实例的实例本身信息)
    // 基于 EurekaInstanceConfig 和 InstanceInfo 构建了 ApplicationInfoManager
    // ApplicationInfoManager:是对服务实例进行管理
    applicationInfoManager = new ApplicationInfoManager(
        instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());

    // 3. 创建 eureka-client 配置项
    EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
    
    // 4. 创建 EurekaClient,DiscoveryClient 是其子类
    eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
} else {
    applicationInfoManager = eurekaClient.getApplicationInfoManager();
}
复制代码
  1. 加载服务实例相关配置:将 eureka-client.properties 配置加载到 ConfigurationManager,并提供 EurekaInstanceConfig

    详细可看:PropertiesInstanceConfig

    问题:这里不是初始化 eureka-server 嘛?为什么要加载 eureka-client

    eureka-server 同时也是一个 eureka-client,因为他可能要向其他的 eureka-server 去进行注册,从而组成一个 eureka-server 的集群。

  2. 基于 EurekaInstanceConfigInstanceInfo 构建了 ApplicationInfoManager

  3. 创建 eureka-client 配置项

  4. 重点:创建 EurekaClientDiscoveryClient 是其子类

    // 定位:com.netflix.discovery.DiscoveryClient.java
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider) {
        ... ...
    
        // 1. 读取 EurekaClientConfig,包括 TransportConfig
        clientConfig = config;
        transportConfig = config.getTransportConfig();
    
        // 2. 保存 EurekaInstanceConfig 和 InstanceInfo
        instanceInfo = myInfo;
        ... ...
    
        // 3. 是否需要抓取注册表
        if (config.shouldFetchRegistry()) {... ...}
    
        // 3. 是否需要注册到 eureka
        if (config.shouldRegisterWithEureka()) {... ...}
    
        try {
            // 4. 支持调度的线程池,核心线程数 2个,使用 google 工具包中线程工厂
            scheduler = Executors.newScheduledThreadPool(... ...);
    
            // 5. 支持心跳的线程池: 核心线程数1个,默认最大线程数5个,且线程存活时间为0
            heartbeatExecutor = new ThreadPoolExecutor(... ...);
    
            // 6. 支持缓存刷新的线程池: 核心线程数1个,默认最大线程数5个,且线程存活时间为0
            cacheRefreshExecutor = new ThreadPoolExecutor(... ...); 
    
            // 7. 创建 EurekaTransport:
            // 支持底层的 eureka-client 跟 eureka-server 进行网络通信的组件
            eurekaTransport = new EurekaTransport();
            ... ...
        }
        // 8. 抓取注册表
        // 这里代码写的不好,难以懂得
        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }
    
        // 9. 初始化调度任务
        initScheduledTasks();
        ... ...
    }
    复制代码

    步骤如下:

    1. 读取 EurekaClientConfig,包括 TransportConfig
    2. 保存 EurekaInstanceConfigInstanceInfo
    3. 是否要注册以及抓取注册表,如果不要的话,释放一些资源
    4. 支持调度的线程池
    5. 支持心跳的线程池
    6. 支持缓存刷新的线程池
    7. 创建EurekaTransport:支持底层的 eureka-clienteureka-server 进行网络通信的组件
    8. 抓取注册表
    9. 初始化调度任务
      1. 如果要抓取注册表的话,就会注册一个定时任务,按照设定的抓取的间隔(默认是30s),在调度线程池去执行 CacheRefreshThread
      2. 如果要向 eureka-server 进行注册的话,启动定时任务,每隔一定时间发送心跳,执行 HeartbeatThread
      3. 创建了服务实例副本传播器,将自己作为一个定时任务进行调度
      4. 创建了服务实例的状态变更的监听器,如果配置了监听,那么就会注册监听器
第三步:处理注册相关的事情
第四步:处理 peer 相关节点
第五步:完成 eureka-server 上下文的构建
第六步:处理善后的事情,从相邻的 eureka 节点拷贝注册信息
第七步:注册所有监控

(2)eureka-server 如何完成服务注册?

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享