日常写组件,最近又接了一个需求,让我负责实现一个rpc组件,提高公司游戏跨服开发的效率,为了写好这个组件,算是将dubbo里里外外研究了一波,目前组件的实现也接近尾声了,因此打算给dubbo的学习做个总结,并穿插说说rpc实现的心路历程,同样需要实现rpc的朋友,或者对dubbo有兴趣的朋友可以关注这个系列。
在写rpc组件之前,我先提了几个灵魂疑问,并从dubbo中找到了答案。
服务是啥?
一个模块,一种玩法,只要是需要进行远程调度的都可以用服务的概念进行包装,我这边简单包装了一个副本服务,类情况如下
平平无奇,等等,我们来看看提供方如何标记服务
到了这一步,服务已经完成了基本定义。
服务最终被注册到了那里?
在xml配置上,我们已经看到了有注册中心的配置
没错,最后提供方定义好的服务会注册到注册中心,目前支持的类型有多种
具体可以查看里边提供的demo实例,那么注册中心有什么作用呢?
简单点描述就是注册中心就是管理服务的地方,提供方将服务放到了这个管理处,而订阅方要用的话则从这个管理处将服务拿过来用,通过注册中心实现了服务的感知。
服务谁来消费?
消费方来使用,我们可以看到
同样也是平平无奇的代码,就是消费方拿到boss接口后,直接调用对应接口即可。
对应提供方有xml去定义服务的注册,同样消费方也是有xml去定义服务的订阅信息,可以看到
简单来说就是,提供方将服务放到注册中心,订阅方从注册中心拿来用。
接下来会涉及到源码部分,以下源码的示例接来自dubbo2.6x,源码方面的注释都已经提交到github上,有需要的可以clone:
什么时候触发的服务暴露
在设计rpc组件的时候,不得不面对这个问题,本着抄dubbo的想法,研究了下dubbo的实现方案
dubbo采用了比较经典的xml配置,并理所当然的使用了NamespaceHandlerSupport将xml中的节点配置映射成了对应对象
可以看到在dubbo-config-spring包底下有个spring.handlers的配置,通过该配置指定了DubboNamespaceHandler
DubboNamespaceHandler会将xml配置对应标签的配置映射成对象,比如service
看看ServiceBean在映射成对象后做了啥,先看看ServiceBean结构
自身是一个监听器,再通过CTRL+F12看看有哪些方法
看到export暴露这个方法后,ALT+F7反调下发现除了注解Annoatition外有两个地方调用,分别是
第一种是在属性被设置后调用,可以看到如果是延迟函数则不会调用。
第二种是看到isDelay的时候才会调用export,也就是说延迟暴露的服务是在监听到ContextRefreshedEvent事件后进行调用的。
在export方法内可以看到
可以针对不同的服务配置配置delay延迟时间,具体的肯定是在xml上配置了。
触发机制到这里基本就结束了,总结一下dubbo的触发机制就是建立在NamespaceHandlerSupport上,将xml中的标签实例化,并通过在afterPropertiesSet或者在监听到Spring容器抛出的容器刷新事件后,触发服务的暴露。
画个流程图总结下
由于我司这边的服务配置最终落地在使用yaml方案上,不引入xml,最终我并没有使用NamespaceHandlerSupport去实例化,而是模仿dubbo3.0的方案包装了一个ServiceBootstrap对象,依赖SmartLifeCycle的生命周期,在start的时候取到yaml的配置,遍历进行服务暴露。dubbo3.0做了比较大调整,后续会专门讲,有兴趣的持续关注该系列。
提一波URL
在说服务暴露之前必须先提一波URL,否则主线没了,后续不好讲。
在我没有接触到dubbo之前,我对URL的定位是指网络地址,而在dubbo中,可以认为是一种约定,几乎dubbo的所有模块都是通过URL来传参,这有什么好处呢?
我们可以想想,如果没有约定好,那么不同的接口之间进行交互的参数便会乱掉,一会是字符串,一会是map,而有了统一的约定后,代码便会更加的规范和统一,我们在看代码的时候也会比较清晰,也容易拓展,比如如果你想拓展什么东西,直接往URL上拼接参数就可以了。
我们可以看到,除了几个基础的参数外,很多参数其实最终都放到了parameters中。
而在我司项目中,我们参考了URL的设计,构建了元数据的结构,也就是map,将服务的部分动态参数通过map进行传递。
服务暴露过程
在深入源码之前先大概总结下服务暴露的几个步骤,分别是:
- 配置的构建、合并、检查。
- URL的组装。
- 服务的暴露、注册。
我将这三个主要的过程放入流程图内
继续跟进服务暴露的具体逻辑,也就是doExport后
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("Already unexported!");
}
if (exported) {
return;
}
exported = true;
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
}
// TODO: 2021/5/27 检查provider是否为空,为空则创建一个,并通过系统变量为其初始化
checkDefault();
/** 各种初始值的设置 **/
// TODO: 2021/5/27 检查Application是否为空
checkApplication();
// TODO: 2021/5/27 检查注册中心是否为空
checkRegistry();
// TODO: 2021/5/27 检查protocols是否为空
checkProtocol();
// TODO: 2021/5/27 补充各种参数
appendProperties(this);
// TODO: 2021/5/27 Stub合法性检查
checkStub(interfaceClass);
// TODO: 2021/5/27 mock合法性检查
checkMock(interfaceClass);
if (path == null || path.length() == 0) {
path = interfaceName;
}
// TODO: 2021/5/27 多协议多注册中心暴露服务
doExportUrls();
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}
复制代码
总结下来不外乎两步:
-
对各类配置进行校验,并且更新部分配置;
-
多协议多注册中心暴露服务;
其中检查的细节暂时不铺开,因为服务暴露整个过程才是重点,后续服务治理了再重新讲这块,接下来继续讲重点doExportUrls方法
@SuppressWarnings({"unchecked", "rawtypes"})
private void doExportUrls() {
// TODO: 2021/5/27 加载注册中心URL
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
// TODO: 2021/5/27 根据不同协议进行服务暴露
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
复制代码
loadRegistries也很简单,其实就是根据注册中心的配置组装成URL,这里多个注册中心比较好理解,多个protocols是什么鬼呢?
其实是这样的,一个服务如果有多个协议那么就都需要暴露,比如同时支持 dubbo 协议和 hessian 协议,那么需要将这个服务用两种协议分别向多个注册中心暴露注册。
参考了这块逻辑,在我司项目中,我们规范了注册中心的接口,允许注册中心有多种实现, 甚至是本地注册中心,但是并不允许有多个注册中心,目前来说是没有这种需求,而要选择哪个注册中心,只需要在yaml文件上进行配置即可
接下来看doExportUrlsFor1Protocol方法
在分析服务暴露流程之前便有提到过,dubbo内部使用URL来携带各类数据,从而贯穿整个生命周期的,而入口其实就是从这个方法开始的,等下我们便可以看到该方法可以分为两个步骤,前个步骤是组装URL的逻辑,后个步骤是真正实现暴露dubbo服务等逻辑的地方,不说了,继续code
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
/**组装服务的URL开始**/
// TODO: 2021/5/27 获取协议名
String name = protocolConfig.getName();
// TODO: 2021/5/27 如果为空,则默认是dubbo
if (name == null || name.length() == 0) {
name = "dubbo";
}
// TODO: 2021/5/27 设置map等各种参数
Map<String, String> map = new HashMap<String, String>();
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
// TODO: 2021/5/27 添加application、module、provider等信息到map中
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
// TODO: 2021/5/27 如果methods的配置列表不为空,则遍历methods配置列表
if (methods != null && !methods.isEmpty()) {
for (MethodConfig method : methods) {
// TODO: 2021/5/27 把方法名加入map
appendParameters(map, method, method.getName());
// TODO: 2021/5/27 添加methodconfig对象的字段信息到map中
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
// TODO: 2021/5/27 添加ArgumentConfig列表
List<ArgumentConfig> arguments = method.getArguments();
if (arguments != null && !arguments.isEmpty()) {
for (ArgumentConfig argument : arguments) {
// convert argument type
if (argument.getType() != null && argument.getType().length() > 0) {
// TODO: 2021/5/27 利用反射拿到接口类的所有方法
Method[] methods = interfaceClass.getMethods();
if (methods != null && methods.length > 0) {
// TODO: 2021/5/27 遍历methods
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// TODO: 2021/5/27 找到目标方法
if (methodName.equals(method.getName())) {
// TODO: 2021/5/27 通过反射拿到方法参数类型
Class<?>[] argtypes = methods[i].getParameterTypes();
// TODO: 2021/5/27 如果下表为-1
if (argument.getIndex() != -1) {
// TODO: 2021/5/27 检测argtypes的名称与ArgumentConfig中的type是否一致
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
// TODO: 2021/5/27 不一致则抛出异常
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// TODO: 2021/5/27 遍历参数,查找argument.type的类型
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
// TODO: 2021/5/27 如果找得到则将ArgumentConfig字段添加map中
if (argclazz.getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
// TODO: 2021/5/27 用户未配置type属性,但配置了index属性,且index != -1,则直接添加到map中
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
// TODO: 2021/5/27 如果是泛化调用,则在map中设置generic和methods
if (ProtocolUtils.isGeneric(generic)) {
map.put(Constants.GENERIC_KEY, generic);
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
// TODO: 2021/5/27 获得版本号
String revision = Version.getVersion(interfaceClass, version);
// TODO: 2021/5/27 放入map中
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
// TODO: 2021/5/27 获得方法集合
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
// TODO: 2021/5/27 设置方法为*
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
// TODO: 2021/5/27 否则加入方法集合中
map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
// TODO: 2021/5/27 将token加入map
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(Constants.TOKEN_KEY, token);
}
}
if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
protocolConfig.setRegister(false);
map.put("notify", "false");
}
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider != null) {
contextPath = provider.getContextpath();
}
// TODO: 2021/5/27 获得地址、端口号
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
// TODO: 2021/5/27 组装生成URL
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
/**组装服务的URL结束**/
/*
* 后续讲解服务暴露
*/
}
复制代码
这个方法实在是又臭又长,我特意分成两部分,目前这部分是组装服务的URL部分,其实简单点说就是:
先将provider、applicaiton、module等各种基础配置直接放入map中,再针对method配置等进行校验,查看该配置是否有配置方法存在,并进行方法签名的校验,如果是才放入map中,然后还额外将一些多余数据,比如泛化调用、版本号等加入map中,最终根据host和port,结合map组装成URL,貌似还是有点长。
总归就是结合服务自身的各种配置放入map中,然后根据host和port以及map等生成URL就是了。
接下来看看后续服务暴露部分
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
/*
* 前面URL组装
*/
// TODO: 2021/5/27 加载ConfiguratorFactory,并生成Configurator实例,判断是否有该协议的实现存在
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
// TODO: 2021/5/27 通过SPI机制配置URL
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(Constants.SCOPE_KEY);
// TODO: 2021/5/27 如果scope为none,则什么都不做
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// TODO: 2021/5/27 如果scope不是远程,则暴露到本地
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
/** 本地服务暴露 **/
exportLocal(url);
}
// TODO: 2021/5/27 如果不是local,则暴露到远程
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && !registryURLs.isEmpty()) {
// TODO: 2021/5/27 遍历注册中心
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
// TODO: 2021/5/27 加载监视器连接
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
// TODO: 2021/5/27 如果没有则添加一个
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// TODO: 2021/5/27 根据URL拿到代理方式
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
// TODO: 2021/5/27 给注册中心的URL添加代理方式
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
// TODO: 2021/5/24 通过SPI机制拿到对应的proxyFactory
/** 根据proxyFactory拿到Invoker **/
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// TODO: 2021/5/24 通过SPI机制拿到对应的protocol,先是RegistryProtocol,再被AOP强化
/** 服务暴露 **/
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
// TODO: 2021/5/24 通过SPI机制拿到对应的proxyFactory
/** 根据proxyFactory拿到Invoker **/
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// TODO: 2021/5/24 通过SPI机制拿到对应的protocol
/** 服务暴露 **/
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
复制代码
后续重要的地方可以认为其实就是遍历注册中心进行服务暴露,只是会根据服务配置域scope来针对性做一些暴露处理,比如如果scope不是远程,则暴露到本地,如果不是local,则暴露到远程。
该方法中又包含了几个核心的拓展实现,包括:
- 本地服务暴露
- 根据proxyFactory拿到Invoker
- 远程服务暴露、注册
继续补充流程图,整理思路
首先第1点,看看本地服务暴露逻辑
private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(LOCALHOST)
.setPort(0);
StaticContext.getContext(Constants.SERVICE_IMPL_CLASS).put(url.getServiceKey(), getServiceClass(ref));
// TODO: 2021/5/27 根据SPI拿到了InjvmProtocol调用了export方
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
// 放入集合中缓存
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
}
}
复制代码
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
复制代码
暴露到本地的大致逻辑其实就是根据SPI机制拿到了InjvmProtocol生成了InjvmExporter,之后放入集合缓存中,至于SPI机制,后续需要开个文章专门讲讲,有兴趣持续关注该系列。
为啥要有本地服务暴露?
大致原因应该是因为可能存在同一个 JVM 内部引用自身服务的情况,因此暴露的本地服务在内部调用的时候可以直接消费同一个 JVM 的服务避免了网络间的通信。
继续看第2点,根据proxyFactory拿到Invoker部分,首先我们看ProxyFactory类名就大概可以猜到该类具备生成代理对象的能力,我们看proxyFactory的生成模式
可以看到,该对象也是通过SPI机制生成的,由于SPI机制也是比较庞大的,为了避免混淆,后续再开篇文章讲解,有兴趣的持续关注。
通过SPI机制拿到了ProxyFactory的实现对象JavassisProxyFactory,最终调用的代码
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO: 2021/5/23 为目标类创建Wrapper
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// TODO: 2021/5/23 创建匿名的Invoker对象,并实现doInvoker方法
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// TODO: 2021/5/23 调用Wrapper的invokeMethod方法,invokeMethod最终会调用目标方法
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
复制代码
该方法就是创建了一个匿名的Inovker对象,在doInvker方法中调用wrapper.invokeMethod方法,invokeMethod最终会调用目标方法。
那么wrapper又是啥?
Wrapper是一个抽象类,在调用Wrapper.getWrapper创建子类的时候,会根据目标Class对象进行解析,拿到各种方法、类成员变量等信息,以及生成invokeMethod方法等代码,在代码生成完毕后,通过Javassist生成Class对象,可以理解为该Class对象就是BossServiceImpl的代理实例,有兴趣了解生成过程的可以看Wrapper.makeWrapper方法。
为啥一定要封装Invoker?
其实就是为了屏蔽本地调用或者远程调用或者集群调用的细节,统一暴露出一个可执行体,方便调用者调用,而不管怎么封装,其实最终都是调向目标方法。
为啥要封装Exporter?
这个涉及到后续服务被具体调用,后面会开一篇文章专门讲这个,有兴趣的可以持续关注。
在我司的rpc框架中,倒是没有使用Javassist去生成代理对象,而是选择了使用jdk提供的Proxy生成机制。
继续补充流程图,整理思路
接下来说说远程服务暴露
远程服务暴露要比本地复杂的多,在doExportUrlsFor1Protocol后半部分,通过proxyFactory生成Inovker后,就需要调用protocol.export做真的服务暴露了,我们可以看到protocol是如何实例化的
又是通过SPI实例化的,通过断点可以看到会先被AOP切面拦截额外做了一些其他的操作,不过最终走向的RegisterProtocol,AOP这块后续再分析,有兴趣的持续关注。
接下来继续看RegisterProtocol.export做了啥
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// TODO: 2021/5/29 服务暴露
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
// TODO: 2021/5/23 获得注册中心的URL
URL registryUrl = getRegistryUrl(originInvoker);
final Registry registry = getRegistry(originInvoker);
// TODO: 2021/5/23 获得已经注册的服务提供者URL
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
boolean register = registeredProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
if (register) {
// TODO: 2021/5/29 真正做服务注册的地方
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// TODO: 2021/5/23 获取override订阅URL
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
// TODO: 2021/5/23 创建override的监听器
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
// TODO: 2021/5/23 缓存监听器到集合中
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// TODO: 2021/5/23 向注册中心订阅override数据
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
// TODO: 2021/5/23 创建并返回DestroyableExporter
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
复制代码
从代码上看,该方法其实做了两件事情,分别是服务暴露和注册:
- 执行了doLocalExport进行服务暴露
- 加载注册中心实现类,向注册中心注册服务
- 向注册中心订阅override数据
- 创建并返回DestroyableExporter
接下来继续看看doLocalExport做了啥
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
// TODO: 2021/5/24 创建Invoker为委托对象
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
// TODO: 2021/5/24 调用protocol的export方法暴露服务
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
复制代码
看逻辑比较简单,主要是根据不同协议配置,根据SPI调用不同的protocol实现,跟暴露到本地时实现的InjvmPortocol一样,默认这里调用的是DubboProtocol.export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// TODO: 2021/5/29 得到服务key,格式:group+"/"+serviceName+":"+serviceVersion+":"+port
String key = serviceKey(url);
// TODO: 2021/5/29 创建exporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
// TODO: 2021/5/24 开启服务器
openServer(url);
// TODO: 2021/5/29 序列化
optimizeSerialization(url);
return exporter;
}
复制代码
可以到export先是new了一个DubboExporter对象, 后续打开了服务,接下来继续看openServer做了啥
private void openServer(URL url) {
String key = url.getAddress();
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
// TODO: 2021/5/24 启动一个服务实例
serverMap.put(key, createServer(url));
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
private ExchangeServer createServer(URL url) {
// TODO: 2021/5/29 服务器关闭是发送readonly时间
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// TODO: 2021/5/29 心跳默认时间
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// TODO: 2021/5/29 获得远程通讯服务端实现方式
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
// TODO: 2021/5/29 添加编解码器DubboCodec实现
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
// TODO: 2021/5/29 启动服务器
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
复制代码
可以看到最终还是依赖URL携带的远程通讯实现方法创建了一个服务器对象。
总结一下:doLocalExport最终其实就是根据URL开启了服务器,并返回了Exporter。
接下来继续看注册服务部分
public void register(URL registryUrl, URL registedProviderUrl) {
// TODO: 2021/5/29 获取注册中心实例
Registry registry = registryFactory.getRegistry(registryUrl);
// TODO: 2021/5/29 调用register
registry.register(registedProviderUrl);
}
复制代码
Regsitry的生成最终也是依赖了SPI机制,最终走向FailbackRegistry.register
@Override
public void register(URL url) {
super.register(url);
// TODO: 2021/5/24 从失败的集合中移除
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// TODO: 2021/5/24 向注册中心发起注册请求
doRegister(url);
} catch (Exception e) {
Throwable t = e;
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// TODO: 2021/5/29 发生异常则放入failedRegistered
failedRegistered.add(url);
}
}
复制代码
可以看到注册的核心实现是在doRegister中,不过通过代码机制我们也可以看出,在注册报错的时候会被trycatch拦截,然后放入failedRegistered容器中,结合FailbackRegistry该类名可以推测应该是有个重试机制存在,看看构造方法
// TODO: 2021/5/24 从url中获取重试频率参数,启动定时器进行重试逻辑
public FailbackRegistry(URL url) {
super(url);
this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
// TODO: 2021/5/29 定时重试
retry();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
复制代码
果不其然,最终如果注册发生了异常,则会进行定时重试。
关于重试机制也是要有的,在我司的rpc框架中,我们将重试时间放在yaml上去配置,不过定时器并没有采用Executor机制,而是模仿了dubbo3.0的写法,也就是时间轮的机制,性能更好。
接下来看注册核心部分doRegister,可以看到该方法是一个抽象方法,由于我在xml配置中配置的注册中心是Zookeeper,因而最终走向ZookeeperRegistry
@Override
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
复制代码
服务注册走到这里基本到头了,再深入便是看注册中心的实现了。
接下来看看向注册中心订阅override数据部分
上面有说过registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener)最终走向的方法是FailbackRegistry.subscribe
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// TODO: 2021/5/29 真正做订阅的地方
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
List<URL> urls = getCacheUrls(url);
if (urls != null && !urls.isEmpty()) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
} else {
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
// TODO: 2021/5/29 订阅失败,则放入失败容器中
addFailedSubscribed(url, listener);
}
}
复制代码
同样,订阅失败后也是放入失败容器中,定时重试进行订阅。
再看看核心实现方法doSubscribe方法,最终走向ZookeeperRegistry.doSubscribe中
@Override
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
// TODO: 2021/5/29 处理URL参数中interface为*的订阅,例如监控中心的订阅
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
/** 先无视 **/
} else {
List<URL> urls = new ArrayList<URL>();
// TODO: 2021/5/29 遍历分类数组
for (String path : toCategoriesPath(url)) {
// TODO: 2021/5/29 获得监听器集合
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
// TODO: 2021/5/29 如果没有则创建
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
// TODO: 2021/5/29 获得监听器
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
// TODO: 2021/5/29 通知服务变化,回调NotifyListener
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
// TODO: 2021/5/29 创建节点,如:/dubbo/com.alibaba.dubbo.demo.DemoService/providers
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// TODO: 2021/5/29 通知数据变更,如RegistryDirectory
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
复制代码
这个方法主要做了订阅和监听触发逻辑,具体逻辑就是订阅了某个服务的URL,在服务变更的时候触发逻辑变化。其实此处已经是可以归纳入服务治理模块了,后续会有专门的文章分享服务治理,有兴趣可以持续关注。
画个流程图,整理下思路
看到这里服务暴露流程基本理完了,还是有点东西在里面的,并且还需要掌握 Dubbo SPI,不然有些点例如自适应什么的还是很难理解的,为了写这篇文章,我前前后后也是花了不少的时间。
最后我再来一张完整的流程图带大家再过一遍,具体还是有很多细节,不过不是主干我就不做分析了,不然文章散掉了。
后续服务治理、APO、SPI机制也会在该流程图上进行拓展,有兴趣的也可以关注流程图链接:
www.processon.com/view/link/6…
总结
虽然看完了该篇文章,但是还是建议大家自己打断点过一遍,可以更加清晰,而如果是为了应付面试官提问的话,基本上记住上面流程图的内容就差不多了,当你研究完了dubbo后,其实会发现dubbo有很多东西可以写,比如服务应用、SPI、dubbo中的AOP机制、服务治理等好几个模块,最后就是带大家撸一个RPC框架了。
还是那句话,想学dubbo的可以点击原文链接,持续关注这一系列。
原文链接:mp.weixin.qq.com/s/gsPa2KHS1…
文章还不错的麻烦给个赞!