服务注册
时序图
dubbo官网的关于提供方暴露服务的蓝色初始化链
服务提供者暴露一个服务的详细过程
首先 ServiceConfig 类拿到对外提供服务的实际类 ref(如:HelloWorldImpl),然后通过 ProxyFactory 类的 getInvoker 方法使用 ref 生成一个 AbstractProxyInvoker 实例,到这一步就完成具体服务到 Invoker 的转化。接下来就是 Invoker 转换到 Exporter 的过程。
Dubbo 处理服务暴露的关键就在 Invoker 转换到 Exporter 的过程,上图中的红色部分。下面我们以 Dubbo 和 RMI 这两种典型协议的实现来进行说明:
Dubbo 的实现
Dubbo 协议的 Invoker 转为 Exporter 发生在 DubboProtocol 类的 export 方法,它主要是打开 socket 侦听服务,并接收客户端发来的各种请求,通讯细节由 Dubbo 自己实现。
RMI 的实现
RMI 协议的 Invoker 转为 Exporter 发生在 RmiProtocol类的 export 方法,它通过 Spring 或 Dubbo 或 JDK 来实现 RMI 服务,通讯细节这一块由 JDK 底层来实现,这就省了不少工作量。
ServiceConfig.export
首先结合时序图,先进入ServiceConfig的export方法。在之前都是一些判断方法,真正做事的是从doExportUrls开始
doExportUrls
private void doExportUrls() {
//注册URL
List<URL> registryURLs = loadRegistries(true);
//遍历ProtocolConfig配置
for (ProtocolConfig protocolConfig : protocols) {
//设置pathKet
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
//封装提供者对象
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
//加入providedServices集合中 生产者服务集合
ApplicationModel.initProviderModel(pathKey, providerModel);
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
复制代码
ProviderModel对象结构如下:包括了类路径、接口、实现以及方法。
doExportUrlsFor1Protocol
然后进入到doExportUrlsFor1Protocol方,在这之前都是做一些配置写入Map的操作:
看重点方法
if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {
// 如果配置的本地暴露
if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
// 如果配置的远程暴露
if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
...
if (CollectionUtils.isNotEmpty(registryURLs)) {
//遍历注册URL
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
//读取监听配置
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
...
//检查是否有自定义代理
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
//生成invoker对象
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
复制代码
ProxyFactory对象
此处可以看一下ProxyFactory对象,是@Adaptive支持spi的,默认实现是javassist,此处我们没配置代理就是默认走javassist代理
getInvoker方法返回了AbstractProxyInvoker对象
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
复制代码
继续回到doExportUrlsFor1Protocol方法
//包装invoker对象
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
复制代码
protocol对象
这里再看一下protocol对象,和proxyfactory对象一样是一个spi扩展,默认是使用dubbo。
重点查看invoker对象如何转为protocol对象的。
在进入export方法之前先看一下wrapperInvoker对象,可以发现制定了protocol实现为register
RegistryProtocol
进入RegistryProtocol.export
//获取注册中心地址
URL registryUrl = getRegistryUrl(originInvoker);
//获取提供者需要注册的地址
URL providerUrl = getProviderUrl(originInvoker);
// 获取进行 注册override协议 的访问地址
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
// 增加override的监听器,当前配置变更,则重写配置
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
//加入缓存
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 根据现有的override协议,对注册地址进行改写操作
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
// 对当前的服务进行本地导出
// 完成后即可在看到本地的20881端口号已经启动,并且暴露服务
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// 获取注册器
final Registry registry = getRegistry(originInvoker);
复制代码
这边ctrl+h查看一下Registry关系图,可以看到有各种注册中心的Register,这边我们用的是zk的
//获取到需要注册的提供者URL 去除无效参数
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
//包装
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//是否需要延迟发布
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
//注册
register(registryUrl, registeredProviderUrl);
复制代码
register方法 调用FallBackRegistry的模板方法
public void register(URL registryUrl, URL registeredProviderUrl) {
//通过spi url获取对应的实现类执行方法 此处为zkRegistry
Registry registry = registryFactory.getRegistry(registryUrl);
//注册 次方法是模板方法,要先走到父类FallBackRegistry中
registry.register(registeredProviderUrl);
}
复制代码
FallBackRegistry.register标记已注册url调用zk注册节点
public void register(URL url) {
//走父类AbstractRegister方法 标记为已注册
super.register(url);
//移除失败已注册URL
removeFailedRegistered(url);
//移除失败未注册URL
removeFailedUnregistered(url);
try {
//子类实现
doRegister(url);
复制代码
zookerRegistry.doRegister注册节点
public void doRegister(URL url) {
try {
//创建节点
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
复制代码
toUrlPath方法
private String toUrlPath(URL url) {
//分类地址(/dubbo/接口路径/角色(生产、消费))+"/" + url字符串
return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString());
}
复制代码
toCategoryPath方法
private String toCategoryPath(URL url) {
// /dubbo/接口路径+"/"+角色
return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
}
复制代码
toServicePath方法
private String toServicePath(URL url) {
//接口路径
String name = url.getServiceInterface();
if (Constants.ANY_VALUE.equals(name)) {
return toRootPath();
}
//目录+接口路径
return toRootDir() + URL.encode(name);
}
复制代码
回到FallBackRegistry.register方法
} catch (Exception e) {
Throwable t = e;
// 启动检测打开,记录日志或者抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// 异步重试
addFailedRegistered(url);
}
}
复制代码
回到RegistryProtocol.export方法
providerInvokerWrapper.setReg(true);
}
// 对override协议进行注册,用于在接收到override请求时做适配,这种方式用于适配2.6.x及之 前的版本(混用)
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
//返回exporter对象
return new DestroyableExporter<>(exporter);
复制代码
回到ServiceConfig.export方法
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
//加入export集合
exporters.add(exporter);
}
/**
* @since 2.7.0
* ServiceData Store
*/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
metadataReportService.publishProvider(url);
}
}
}
复制代码
provider总结
从上述流程可知道 首先调用ServiceConfig的doExportUrlsFor1Protocol方法,然后由proxyFactory的getInvoke方法生成invoker对象,再由Protocol.export方法将invoker转为exporter对象,并通过Registry.register方法注册服务信息到zk。
服务消费
引用服务时序
下图为官网的引用服务时序图
服务消费者消费一个服务的详细过程
上图是服务消费的主过程:
首先 ReferenceConfig 类的 init 方法调用 Protocol 的 refer 方法生成 Invoker 实例(如上图中的红色部分),这是服务消费的关键。接下来把 Invoker 转换为客户端需要的接口(如:HelloWorld)。
ReferenceConfig.get 调用init方法
首先通过时序图锁定方法,看源码一定要有目的性的看。
public synchronized T get() {
checkAndUpdateSubConfigs();
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
init();
}
return ref;
}
复制代码
可以看到一些判断检查,主要的方法就是init()方法
ReferenceConfig.init
首先和ServiceConfig一样先做一些数据准备以及校验,数据如下:
这边直接看重点方法creatProxy
ReferenceConfid.creatProxy 生成可用代理对象
这一步是先通过protocol转为invoker对象,然后通过proxyFactory生成可用的impl对象
private T createProxy(Map<String, String> map) {
//是否本地调用 如果没有URL就且本地有缓存 选本地
if (shouldJvmRefer(map)) {
URL url = new URL(Constants.LOCAL_PROTOCOL, Constants.LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
// 指定URL则进入 这边没有制定了走esle
if (url != null && url.length() > 0) {
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
}
//从注册中心配置中获取URL
else {
//检查注册中心
checkRegistry();
//获取注册地址 传入false表示是否是提供方
List<URL> us = loadRegistries(false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
//监听URL存在 放入map
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
//拼接结果如下
//refer -> application%3Ddubbo-service-consumer%26default.check%3Dfalse%26
//default.lazy%3Dfalse%26default.sticky%3Dfalse%26dubbo%3D2.0.2%26
//interface%3Dcom.study.service.HelloDubbo%26lazy%3Dfalse%26loadbalance%3Drandom%26methods%3DsayHello%26pid%3D5113%26
//register.ip%3D172.16.8.233%26release%3D2.7.1%26side%3D
//consumer%26sticky%3Dfalse%26timestamp%3D1618564873618
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
...
}
}
//如果是一个URL 直接生成invoker对象
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
复制代码
RegistryProtocol.refer方法 此处和provider一样走的是RegistryProtocol
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = URLBuilder.from(url)
.setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
.removeParameter(REGISTRY_KEY)
.build();
//获取注册中心对象 这里是ZookeeperRegistry
Registry registry = registryFactory.getRegistry(url);
//判断是不是RegistryService类型
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
//解析刚刚加入的refer属性
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
return doRefer(cluster, registry, type, url);
}
复制代码
RegistryProtocol.doRefer 真正生成invoker对象的方法
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//生成Directory对象
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
//设置注册中心对象
directory.setRegistry(registry);
//设置协议对象
directory.setProtocol(protocol);
// 获取所有的refer
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
//设置已注册消费者URL
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
//注册消费者 在zk中新增节点
registry.register(directory.getRegisteredConsumerUrl());
}
//构建路由链
directory.buildRouterChain(subscribeUrl);
//订阅各个目录 并触发监听修改信息
//RegistryDirectory实现了NotifyListener接口,会调用notify
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
复制代码
RegistryProtocol.subscribe
中间很多步骤,此处省略非重点步骤,如下是调用链
RegistryDirectiry.refreshInvoker
走到toInvokers方法,截取重点方法片段
//refreshInvoker方法
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
...
}
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
...
//此处将生成好的invoke放入RegistryDirectory中
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap;
...
}
//toInvokers方法
//本地获取
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
...
if (enabled) {
//生成invoker对象 ,传入的参数一个invoker对象,一个消费者url 一个提供者url
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
}
}
...
}
复制代码
protocol.refer(serviceType, url)
是通过dubboProtocol的refer方法
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
//传入服务接口类型,消费者url,消费者client,以及invoker集合
//生成DubboInvoker对象
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
复制代码
最后registryDirectory中invoker信息如下
回到RegistryProtocol.doRefer方法
//利用cluster封装directory对象,此时的directory已经存有了router provider consumer信息
Invoker invoker = cluster.join(directory);
//提供者消费者记录信息
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
复制代码
此处生成完invoker对象,信息如下
回到ReferenceConfig.createProxy方法
}
//多个URL
else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // 用最后一个注册中心的地址
}
}
if (registryURL != null) { // registry url is available
// 集群模式下使用 RegistryAwareCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, RegistryAwareCluster.NAME);
// 创建StaticDirectory实例,并由cluster合并invker暴露出一个invoker进行使用
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url, must be direct invoke.
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
...
// 创建代理对象
return (T) proxyFactory.getProxy(invoker);
}
复制代码
consumer总结
消费端的大体流程是这样的,首先是reference.init方法,其中比较重要的方法就是createProxy方法,这个方法 主要做的事情1.生成远程调用的invoker对象。2.将invoker对象转为proxy代理对象。
其中生成invoker对象是通过RegistryProtocol来实现的。
此处省略本地URL判断操作
- 执行RegistryProtocol.dorefer方法里面新建了RegistryDirectory对象。
- 执行RegistryDirectory.subscribe方法监听了provider、router等节点,这样就得到provider配置
- 执行Registry.doNotify方法触发RegistryDirectory.refreshInvoker方法
- 再由RegistryDirectory.toInvokers方法 调用DubboProtocol.refer方法生成invoker对象存入RegistryDirectory对象中,
- 并通过RegistryProtocol.dorefer方法中的CLustor对象合并invoker,暴露单个可使用对象最后
- 最后再由ProxyFactory生成代理对象返回。
URL解析
protocol://host:port/path?key=value&key=value
复制代码
URL主要有以下几部分组成:
- protocol: 协议,一般像我们的 provider 或者 consumer 在这里都是人为具体的协议
- host: 当前 provider 或者其他协议所具体针对的地址,比较特殊的像 override 协议所指定的host就是 0.0.0.0 代表所有的机器都生效
- port: 和上面相同,代表所处理的端口号
- path: 服务路径,在 provider 或者 consumer 等其他中代表着我们真实的业务接口
- key=value: 这些则代表具体的参数,这里我们可以理解为对这个地址的配置。比如我们 provider中需要具体机器的服务应用名,就可以是一个配置的方式设置上去。
注意:Dubbo中的URL与java中的URL是有一些区别的,如下:
- 这里提供了针对于参数的 parameter 的增加和减少(支持动态更改)
- 提供缓存功能,对一些基础的数据做缓存.