本文已参与周末学习计划,点击链接查看详情 链接
一、前言
看源码:抓大放小,先主流程,再细枝末节。
用技巧连蒙带猜:
- 看方法名(英文名)
- 看注解
eureka
运行的核心的流程:
- 服务注册:
eureka client
往eureka server
注册的过程 - 服务发现:
eureka client
从eureka server
获取注册表的过程 - 服务心跳:
eureka client
定时往eureka server
发送续约通知(心跳) - 服务实例摘除
- 通信
- 限流
- 自我保护
server
集群:eureka-server
之间相互注册
eureka server
:提供注册中心的功能。
前提需知:
eureka-server
是一个web
应用,可以打成war
包,在tomcat
里启动。
(1)源码着手点
源码着手点可分为两点:
-
eureka-server
的build.gradle
:各种依赖和构建所需的配置 -
eureka-server
的web.xml
:web
应用最核心之处,定义各种listener
和filter
1)从 build.gradle
可看出
文件路径:
eureka-server/build.gradle
可以看到,依赖的模块有:
-
eureka-client
:在集群模式中,每个server
也是一个client
,可以互相注册。 -
eureka-core
:注册中心的核心角色,接收服务注册请求,提供服务发现的功能,保持心跳(续约请求),摘除故障服务实例。 -
jersey
:RESTful HTTP
框架,eureka client
和eureka server
之间进行通信,都是基于jersey
。例如:
eureka-client-jersey2
,eureka-core-jersey2
-
等等,其他可从
build.gradle
查阅
2)从 web.xml
可看出
文件路径:
eureka-server/src/main/webapp/WEB-INF/web.xml
从 web.xml
可看出:
-
listener
监听器: 负责web
应用初始化; 例如,启动后台线程去加载配置文件对应
eureka-server
里,就是com.netflix.eureka.EurekaBootStrap
-
filter
过滤器:对请求进行处理
核心的 filter
有四个:
当然,每个
filter
在web.xml
也定义了相对应的URL
匹配。
-
StatusFilter
:负责状态相关的处理逻辑 -
ServerRequestAuthFilter
:授权认证相关的处理逻辑 -
RateLimitingFilter
:负责限流相关的处理逻辑 -
GzipEncodingEnforcingFilter
:gzip
,压缩相关的
最后一个是 jersey
的 ServletContainer
: 接收所有的请求,作为请求的入口
<filter>
<filter-name>jersey</filter-name>
<filter-class>com.sun.jersey.spi.container.servlet.ServletContainer</filter-class>
... ...
</filter>
复制代码
(2)一些概念
读源码准备,Eureka
的一些概念:
Register
服务注册Renew
服务续约Fetch Registries
获取服务注册列表信息Cancel
服务下线Eviction
服务剔除
二、从源码中学到了什么
凡凡觉得 “学到了” 的东西。
(1)线程池的使用
学习他人怎么使用线程池,方便以后自己写架构时候,拿来就用。
源码中线程池有:
-
调度线程池
-
心跳线程池
-
缓存刷新线程池
-
调度线程池
private final ScheduledExecutorService scheduler; // 支持调度的线程池,核心线程数 2个,使用 google 工具包中线程工厂,后台线程 scheduler = Executors.newScheduledThreadPool( 2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); 复制代码
-
心跳线程池
private final ThreadPoolExecutor heartbeatExecutor; // 支持心跳的线程池: 核心线程数1个,默认最大线程数5个,且线程存活时间为0,后台线程 heartbeatExecutor = new ThreadPoolExecutor( 1, 5, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() 复制代码
-
缓存刷新线程池
private final ThreadPoolExecutor cacheRefreshExecutor; // 支持缓存刷新的线程池: 核心线程数1个,默认最大线程数5个,且线程存活时间为0,后台线程 cacheRefreshExecutor = new ThreadPoolExecutor( 1, 5, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); 复制代码
(3)循环队列
在
com.netflix.eureka.registry.AbstractInstanceRegistry.java
里定义两个循环队列。
// 最近注册的队列 private final CircularQueue<Pair<Long, String>> recentRegisteredQueue; // 最近取消的队列 private final CircularQueue<Pair<Long, String>> recentCanceledQueue; 复制代码
private class CircularQueue<E> extends ConcurrentLinkedQueue<E> {
private int size = 0;
public CircularQueue(int size) {
this.size = size;
}
@Override
public boolean add(E e) {
this.makeSpaceIfNotAvailable();
return super.add(e);
}
private void makeSpaceIfNotAvailable() {
if (this.size() == size) {
this.remove();
}
}
public boolean offer(E e) {
this.makeSpaceIfNotAvailable();
return super.offer(e);
}
}
复制代码
三、直接怼源码
(1)初始化
在eureka-server
初始化时,就会触发这个监听器:
在
eureka-server/src/main/webapp/WEB-INF/web.xml
定义了
<listener>
<listener-class>com.netflix.eureka.EurekaBootStrap</listener-class>
</listener>
复制代码
而这个监听器的执行初始化的方法,contextInitialized()
方法,源码如下:
// 定位:eureka-core/src/main/java/com/netflix/eureka/EurekaBootStrap.java
@Override
public void contextInitialized(ServletContextEvent event) {
try {
// 1. 初始化环境变量
initEurekaEnvironment();
// 2. 初始化上下文
initEurekaServerContext();
ServletContext sc = event.getServletContext();
sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
} catch (Throwable e) {
logger.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
复制代码
目前源码着手点,可分为两点:
-
初始化环境变量
initEurekaEnvironment()
-
初始化上下文
initEurekaServerContext()
1)初始化环境变量 initEurekaEnvironment()
主要过程步骤如下:
-
创建
ConcurrentCompositeConfiguration
:代表了所谓的配置,包括了eureka
需要的所有的配置1.1 在创建时候,先调用了
clear()
方法1.2 之后,
fireEvent()
发布了一个事件(EVENT_CLEAR
) -
往
ConcurrentCompositeConfiguration config
实例加入了一堆别配置2.1 初始化数据中心的配置,如果没有配置的话,就是
DEFAULT data center
2.2 初始化
eureka
运行的环境,如果没有配置,默认设置为test
环境
进入方法,查看对应源码:
// 定位:eureka-core/src/main/java/com/netflix/eureka/EurekaBootStrap.java
protected void initEurekaEnvironment() throws Exception {
logger.info("Setting the eureka configuration..");
// 1. 重要的是:
String dataCenter = ConfigurationManager.getConfigInstance().getString(EUREKA_DATACENTER);
// 2. 其他别的配置
... ...
}
复制代码
进入 ConfigurationManager.getConfigInstance()
,可看到:
// 定位:com.netflix.config.ConfigurationManager.java
public static AbstractConfiguration getConfigInstance() {
if (instance == null) {
synchronized (ConfigurationManager.class) {
if (instance == null) {
instance = getConfigInstance(Boolean.getBoolean(DynamicPropertyFactory.DISABLE_DEFAULT_CONFIG));
}
}
}
return instance;
}
复制代码
初始化配置管理器(ConfigurationManager
):采用 双检查锁(double check
)和volatile
单例模式 来实现线程安全。
进入 getConfigInstance()
方法:
// 定位:com.netflix.config.ConfigurationManager.java
private static AbstractConfiguration getConfigInstance(boolean defaultConfigDisabled) {
if (instance == null && !defaultConfigDisabled) {
// 1. 创建 ConcurrentCompositeConfiguration
instance = createDefaultConfigInstance();
registerConfigBean();
}
return instance;
}
private static AbstractConfiguration createDefaultConfigInstance() {
// 创建主要配置
ConcurrentCompositeConfiguration config = new ConcurrentCompositeConfiguration();
// 加载其他配置
... ...
return config;
}
// 定位:com.netflix.config.ConcurrentCompositeConfiguration.java
// 1.1. 创建主要配置时,会先清理
public ConcurrentCompositeConfiguration() {
clear();
}
public final void clear() {
// 1.2 发布事件
fireEvent(EVENT_CLEAR, null, null, true);
... ...
fireEvent(EVENT_CLEAR, null, null, false);
... ...
}
复制代码
2)初始化上下文 initEurekaServerContext()
主要过程步骤如下:
- 第一步:加载
eureka-server
文件配置 - 第二步:初始化
eureka-server
内部的eureka-client
(用来跟其他的eureka-server
节点进行注册和通信) - 第三步:处理注册相关的事情
- 第四步:处理
peer
相关节点 - 第五步:完成
eureka-server
上下文(context
)的构建 - 第六步:处理一些善后的事情,从相邻的
eureka
节点拷贝注册信息 - 第七步:注册所有监控
protected void initEurekaServerContext() throws Exception {
// 第一步:加载 eureka-server 文件配置
EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();
// 第二步:初始化 eureka-server 内部的 eureka-client
//(跟其他的 eureka-server 节点进行注册和通信)
ApplicationInfoManager applicationInfoManager = null;
// 第三步:处理注册相关的事情
PeerAwareInstanceRegistry registry;
// 第四步:处理 peer 相关节点
PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes();
// 第五步:完成 eureka-server 上下文(context)的构建
serverContext = new DefaultEurekaServerContext();
// 第六步:处理一些善后的事情,从相邻的 eureka 节点拷贝注册信息
int registryCount = registry.syncUp();
registry.openForTraffic(applicationInfoManager, registryCount);
// 第七步:注册所有监控
EurekaMonitors.registerAllStats();
}
复制代码
下面进入主要流程:
第一步:加载 eureka-server
文件配置
加载 eureka-server.properties
的过程:
-
创建了一个
DefaultEurekaServerConfig
对象,创建时会调用init()
方法DefaultEurekaServerConfig
提供的获取配置项的各个方法,都是通过硬编码的配置项名称,从DynamicPropertyFactory
中获取配置项的值,DynamicPropertyFactory
又是从ConfigurationManager
那儿获取来的,所以也包含了所有配置项的值。
// 1. init()
private void init() {
// 拿到当前的环境:eureka.environment
String env = ConfigurationManager.getConfigInstance().getString(
EUREKA_ENVIRONMENT, TEST);
// 设置当前属性
ConfigurationManager.getConfigInstance().setProperty(
ARCHAIUS_DEPLOYMENT_ENVIRONMENT, env);
// 加载 eureka 配置文件的名字: eureka-server
String eurekaPropsFile = EUREKA_PROPS_FILE.get();
// 2. 加载配置:eureka-server.properties
ConfigurationManager.loadCascadedPropertiesFromResources(eurekaPropsFile);
}
复制代码
-
将
eureka-server.properties
中的配置加载到了一个Properties
对象中,然后将Properties
对象中的配置放到ConfigurationManager
中去,此时ConfigurationManager
中去就有了所有的配置了。eureka-server.properties
默认没有配置,所以读取的都是默认值。
第二步:初始化 eureka-server
内部的 eureka-client
ApplicationInfoManager applicationInfoManager = null;
if (eurekaClient == null) {
// 1. 将 eureka-client.properties 配置加载到 ConfigurationManager
EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
? new CloudInstanceConfig()
: new MyDataCenterInstanceConfig();
// 2. 这期间会构建 InstanceInfo(服务实例的实例本身信息)
// 基于 EurekaInstanceConfig 和 InstanceInfo 构建了 ApplicationInfoManager
// ApplicationInfoManager:是对服务实例进行管理
applicationInfoManager = new ApplicationInfoManager(
instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());
// 3. 创建 eureka-client 配置项
EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
// 4. 创建 EurekaClient,DiscoveryClient 是其子类
eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
} else {
applicationInfoManager = eurekaClient.getApplicationInfoManager();
}
复制代码
-
加载服务实例相关配置:将
eureka-client.properties
配置加载到ConfigurationManager
,并提供EurekaInstanceConfig
详细可看:
PropertiesInstanceConfig
问题:这里不是初始化
eureka-server
嘛?为什么要加载eureka-client
?eureka-server
同时也是一个eureka-client
,因为他可能要向其他的eureka-server
去进行注册,从而组成一个eureka-server
的集群。 -
基于
EurekaInstanceConfig
和InstanceInfo
构建了ApplicationInfoManager
-
创建
eureka-client
配置项 -
重点:创建
EurekaClient
,DiscoveryClient
是其子类// 定位:com.netflix.discovery.DiscoveryClient.java DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) { ... ... // 1. 读取 EurekaClientConfig,包括 TransportConfig clientConfig = config; transportConfig = config.getTransportConfig(); // 2. 保存 EurekaInstanceConfig 和 InstanceInfo instanceInfo = myInfo; ... ... // 3. 是否需要抓取注册表 if (config.shouldFetchRegistry()) {... ...} // 3. 是否需要注册到 eureka if (config.shouldRegisterWithEureka()) {... ...} try { // 4. 支持调度的线程池,核心线程数 2个,使用 google 工具包中线程工厂 scheduler = Executors.newScheduledThreadPool(... ...); // 5. 支持心跳的线程池: 核心线程数1个,默认最大线程数5个,且线程存活时间为0 heartbeatExecutor = new ThreadPoolExecutor(... ...); // 6. 支持缓存刷新的线程池: 核心线程数1个,默认最大线程数5个,且线程存活时间为0 cacheRefreshExecutor = new ThreadPoolExecutor(... ...); // 7. 创建 EurekaTransport: // 支持底层的 eureka-client 跟 eureka-server 进行网络通信的组件 eurekaTransport = new EurekaTransport(); ... ... } // 8. 抓取注册表 // 这里代码写的不好,难以懂得 if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } // 9. 初始化调度任务 initScheduledTasks(); ... ... } 复制代码
步骤如下:
- 读取
EurekaClientConfig
,包括TransportConfig
- 保存
EurekaInstanceConfig
和InstanceInfo
- 是否要注册以及抓取注册表,如果不要的话,释放一些资源
- 支持调度的线程池
- 支持心跳的线程池
- 支持缓存刷新的线程池
- 创建
EurekaTransport
:支持底层的eureka-client
跟eureka-server
进行网络通信的组件 - 抓取注册表
- 初始化调度任务:
- 如果要抓取注册表的话,就会注册一个定时任务,按照设定的抓取的间隔(默认是30s),在调度线程池去执行
CacheRefreshThread
- 如果要向
eureka-server
进行注册的话,启动定时任务,每隔一定时间发送心跳,执行HeartbeatThread
- 创建了服务实例副本传播器,将自己作为一个定时任务进行调度
- 创建了服务实例的状态变更的监听器,如果配置了监听,那么就会注册监听器
- 如果要抓取注册表的话,就会注册一个定时任务,按照设定的抓取的间隔(默认是30s),在调度线程池去执行
- 读取