从零开始学Dubbo-进阶篇-服务注册与消费源码剖析

服务注册

时序图

dubbo官网的关于提供方暴露服务的蓝色初始化链
image.png

服务提供者暴露一个服务的详细过程

首先 ServiceConfig 类拿到对外提供服务的实际类 ref(如:HelloWorldImpl),然后通过 ProxyFactory 类的 getInvoker 方法使用 ref 生成一个 AbstractProxyInvoker 实例,到这一步就完成具体服务到 Invoker 的转化。接下来就是 Invoker 转换到 Exporter 的过程。

Dubbo 处理服务暴露的关键就在 Invoker 转换到 Exporter 的过程,上图中的红色部分。下面我们以 Dubbo 和 RMI 这两种典型协议的实现来进行说明:

Dubbo 的实现

Dubbo 协议的 Invoker 转为 Exporter 发生在 DubboProtocol 类的 export 方法,它主要是打开 socket 侦听服务,并接收客户端发来的各种请求,通讯细节由 Dubbo 自己实现。

RMI 的实现

RMI 协议的 Invoker 转为 Exporter 发生在 RmiProtocol类的 export 方法,它通过 Spring 或 Dubbo 或 JDK 来实现 RMI 服务,通讯细节这一块由 JDK 底层来实现,这就省了不少工作量。
image.png

ServiceConfig.export

首先结合时序图,先进入ServiceConfig的export方法。在之前都是一些判断方法,真正做事的是从doExportUrls开始

doExportUrls

private void doExportUrls() {
    //注册URL
    List<URL> registryURLs = loadRegistries(true);
    //遍历ProtocolConfig配置
    for (ProtocolConfig protocolConfig : protocols) {
        //设置pathKet
        String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
        //封装提供者对象
        ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
        //加入providedServices集合中 生产者服务集合
        ApplicationModel.initProviderModel(pathKey, providerModel);
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}
复制代码

ProviderModel对象结构如下:包括了类路径、接口、实现以及方法。
image.png

doExportUrlsFor1Protocol

然后进入到doExportUrlsFor1Protocol方,在这之前都是做一些配置写入Map的操作:

image.png

看重点方法

 if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {

    // 如果配置的本地暴露
    if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {
        exportLocal(url);
    }
    // 如果配置的远程暴露
    if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
        ...
        if (CollectionUtils.isNotEmpty(registryURLs)) {
            //遍历注册URL
            for (URL registryURL : registryURLs) {
                url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                //读取监听配置
                URL monitorUrl = loadMonitor(registryURL);
                if (monitorUrl != null) {
                    url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                }
                ...
                //检查是否有自定义代理
                String proxy = url.getParameter(Constants.PROXY_KEY);
                if (StringUtils.isNotEmpty(proxy)) {
                    registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
                }
                //生成invoker对象
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
复制代码
ProxyFactory对象

此处可以看一下ProxyFactory对象,是@Adaptive支持spi的,默认实现是javassist,此处我们没配置代理就是默认走javassist代理

image.png

getInvoker方法返回了AbstractProxyInvoker对象

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}
复制代码

继续回到doExportUrlsFor1Protocol方法

                //包装invoker对象
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
复制代码
protocol对象

这里再看一下protocol对象,和proxyfactory对象一样是一个spi扩展,默认是使用dubbo。
重点查看invoker对象如何转为protocol对象的。
image.png

在进入export方法之前先看一下wrapperInvoker对象,可以发现制定了protocol实现为register

image.png

RegistryProtocol

进入RegistryProtocol.export

//获取注册中心地址
URL registryUrl = getRegistryUrl(originInvoker);
//获取提供者需要注册的地址
URL providerUrl = getProviderUrl(originInvoker);

// 获取进行 注册override协议 的访问地址
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
// 增加override的监听器,当前配置变更,则重写配置
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
//加入缓存
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

// 根据现有的override协议,对注册地址进行改写操作
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
// 对当前的服务进行本地导出 
// 完成后即可在看到本地的20881端口号已经启动,并且暴露服务
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

// 获取注册器
final Registry registry = getRegistry(originInvoker);
复制代码

这边ctrl+h查看一下Registry关系图,可以看到有各种注册中心的Register,这边我们用的是zk的

image.png

image.png

//获取到需要注册的提供者URL 去除无效参数
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
//包装
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
        registryUrl, registeredProviderUrl);
//是否需要延迟发布
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
    //注册
    register(registryUrl, registeredProviderUrl);
复制代码
register方法 调用FallBackRegistry的模板方法
public void register(URL registryUrl, URL registeredProviderUrl) {
        //通过spi url获取对应的实现类执行方法 此处为zkRegistry
        Registry registry = registryFactory.getRegistry(registryUrl);
        //注册 次方法是模板方法,要先走到父类FallBackRegistry中
        registry.register(registeredProviderUrl);
    }
复制代码
FallBackRegistry.register标记已注册url调用zk注册节点
public void register(URL url) {
    //走父类AbstractRegister方法 标记为已注册
    super.register(url);
    //移除失败已注册URL
    removeFailedRegistered(url);
    //移除失败未注册URL
    removeFailedUnregistered(url);
    try {
        //子类实现
        doRegister(url);
复制代码
zookerRegistry.doRegister注册节点
public void doRegister(URL url) {
    try {
        //创建节点
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
复制代码

toUrlPath方法

private String toUrlPath(URL url) {
    //分类地址(/dubbo/接口路径/角色(生产、消费))+"/" + url字符串  
    return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString());
}
复制代码

toCategoryPath方法

private String toCategoryPath(URL url) {
    //  /dubbo/接口路径+"/"+角色
    return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
}
复制代码

toServicePath方法

private String toServicePath(URL url) {
    //接口路径
    String name = url.getServiceInterface();
    if (Constants.ANY_VALUE.equals(name)) {
        return toRootPath();
    }
    //目录+接口路径
    return toRootDir() + URL.encode(name);
}
复制代码

回到FallBackRegistry.register方法

    } catch (Exception e) {
        Throwable t = e;

        // 启动检测打开,记录日志或者抛出异常
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // 异步重试
        addFailedRegistered(url);
    }
}
复制代码

回到RegistryProtocol.export方法

    providerInvokerWrapper.setReg(true);
}

// 对override协议进行注册,用于在接收到override请求时做适配,这种方式用于适配2.6.x及之 前的版本(混用)
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
//返回exporter对象
return new DestroyableExporter<>(exporter);
复制代码

回到ServiceConfig.export方法


            }
        } else {
            Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
            DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

            Exporter<?> exporter = protocol.export(wrapperInvoker);
            //加入export集合
            exporters.add(exporter);
        }
        /**
         * @since 2.7.0
         * ServiceData Store
         */
        MetadataReportService metadataReportService = null;
        if ((metadataReportService = getMetadataReportService()) != null) {
            metadataReportService.publishProvider(url);
        }
    }
}
复制代码

provider总结

从上述流程可知道 首先调用ServiceConfig的doExportUrlsFor1Protocol方法,然后由proxyFactory的getInvoke方法生成invoker对象,再由Protocol.export方法将invoker转为exporter对象,并通过Registry.register方法注册服务信息到zk。

服务消费

引用服务时序

下图为官网的引用服务时序图
image.png

服务消费者消费一个服务的详细过程

image.png
上图是服务消费的主过程:

首先 ReferenceConfig 类的 init 方法调用 Protocol 的 refer 方法生成 Invoker 实例(如上图中的红色部分),这是服务消费的关键。接下来把 Invoker 转换为客户端需要的接口(如:HelloWorld)。

ReferenceConfig.get 调用init方法

首先通过时序图锁定方法,看源码一定要有目的性的看。

public synchronized T get() {
        checkAndUpdateSubConfigs();
        if (destroyed) {
            throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
        }
        if (ref == null) {
            init();
        }
        return ref;
    }
复制代码

可以看到一些判断检查,主要的方法就是init()方法

ReferenceConfig.init

首先和ServiceConfig一样先做一些数据准备以及校验,数据如下:

image.png
这边直接看重点方法creatProxy

ReferenceConfid.creatProxy 生成可用代理对象

这一步是先通过protocol转为invoker对象,然后通过proxyFactory生成可用的impl对象

private T createProxy(Map<String, String> map) {
    //是否本地调用 如果没有URL就且本地有缓存 选本地
    if (shouldJvmRefer(map)) {
        URL url = new URL(Constants.LOCAL_PROTOCOL, Constants.LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
        invoker = refprotocol.refer(interfaceClass, url);
        if (logger.isInfoEnabled()) {
            logger.info("Using injvm service " + interfaceClass.getName());
        }
    } else {
        // 指定URL则进入 这边没有制定了走esle
        if (url != null && url.length() > 0) { 
            String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
            if (us != null && us.length > 0) {
                for (String u : us) {
                    URL url = URL.valueOf(u);
                    if (StringUtils.isEmpty(url.getPath())) {
                        url = url.setPath(interfaceName);
                    }
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    } else {
                        urls.add(ClusterUtils.mergeUrl(url, map));
                    }
                }
            }
        } 
        //从注册中心配置中获取URL
        else { 
            //检查注册中心
            checkRegistry();
            //获取注册地址 传入false表示是否是提供方
            List<URL> us = loadRegistries(false);
            if (CollectionUtils.isNotEmpty(us)) {
                for (URL u : us) {
                    //监听URL存在 放入map
                    URL monitorUrl = loadMonitor(u);
                    if (monitorUrl != null) {
                        map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                    }
                    //拼接结果如下
                    //refer -> application%3Ddubbo-service-consumer%26default.check%3Dfalse%26
                    //default.lazy%3Dfalse%26default.sticky%3Dfalse%26dubbo%3D2.0.2%26
                    //interface%3Dcom.study.service.HelloDubbo%26lazy%3Dfalse%26loadbalance%3Drandom%26methods%3DsayHello%26pid%3D5113%26
                    //register.ip%3D172.16.8.233%26release%3D2.7.1%26side%3D
                    //consumer%26sticky%3Dfalse%26timestamp%3D1618564873618
                    urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                }
            }
            if (urls.isEmpty()) {
               ...
            }
        }
        
        //如果是一个URL 直接生成invoker对象
        if (urls.size() == 1) {
            invoker = refprotocol.refer(interfaceClass, urls.get(0));
复制代码
RegistryProtocol.refer方法 此处和provider一样走的是RegistryProtocol
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        url = URLBuilder.from(url)
                .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
                .removeParameter(REGISTRY_KEY)
                .build();
        //获取注册中心对象 这里是ZookeeperRegistry
        Registry registry = registryFactory.getRegistry(url);
        //判断是不是RegistryService类型
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        //解析刚刚加入的refer属性
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        String group = qs.get(Constants.GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        return doRefer(cluster, registry, type, url);
    }

复制代码
RegistryProtocol.doRefer 真正生成invoker对象的方法
 private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    //生成Directory对象
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    //设置注册中心对象
    directory.setRegistry(registry);
    //设置协议对象
    directory.setProtocol(protocol);
    // 获取所有的refer
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
         //设置已注册消费者URL
         directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
        //注册消费者 在zk中新增节点
        registry.register(directory.getRegisteredConsumerUrl());
    }
    //构建路由链
    directory.buildRouterChain(subscribeUrl);
    //订阅各个目录 并触发监听修改信息
    //RegistryDirectory实现了NotifyListener接口,会调用notify
    directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
            PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
复制代码
RegistryProtocol.subscribe

中间很多步骤,此处省略非重点步骤,如下是调用链

image.png

RegistryDirectiry.refreshInvoker

走到toInvokers方法,截取重点方法片段

//refreshInvoker方法
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
    ...
}

List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
...
//此处将生成好的invoke放入RegistryDirectory中
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap;
...
}

//toInvokers方法
//本地获取
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
    try {
        boolean enabled = true;
        ...
        if (enabled) {
            //生成invoker对象 ,传入的参数一个invoker对象,一个消费者url 一个提供者url
            invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
        }
    } 
    ...
}
复制代码
protocol.refer(serviceType, url)

是通过dubboProtocol的refer方法

image.png

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);
    //传入服务接口类型,消费者url,消费者client,以及invoker集合
    //生成DubboInvoker对象
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);

    return invoker;
}
复制代码

最后registryDirectory中invoker信息如下
image.png
回到RegistryProtocol.doRefer方法

    //利用cluster封装directory对象,此时的directory已经存有了router provider consumer信息
    Invoker invoker = cluster.join(directory);
    //提供者消费者记录信息
    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    return invoker;
}
复制代码

此处生成完invoker对象,信息如下

image.png

回到ReferenceConfig.createProxy方法

        }
        //多个URL
        else {
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;
            for (URL url : urls) {
                invokers.add(refprotocol.refer(interfaceClass, url));
                if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                    registryURL = url; // 用最后一个注册中心的地址
                }
            }
            if (registryURL != null) { // registry url is available
                // 集群模式下使用 RegistryAwareCluster 
                URL u = registryURL.addParameter(Constants.CLUSTER_KEY, RegistryAwareCluster.NAME);
                // 创建StaticDirectory实例,并由cluster合并invker暴露出一个invoker进行使用
                invoker = cluster.join(new StaticDirectory(u, invokers));
            } else { // not a registry url, must be direct invoke.
                invoker = cluster.join(new StaticDirectory(invokers));
            }
        }
    }
    ...
    // 创建代理对象
    return (T) proxyFactory.getProxy(invoker);
}
复制代码

consumer总结

消费端的大体流程是这样的,首先是reference.init方法,其中比较重要的方法就是createProxy方法,这个方法 主要做的事情1.生成远程调用的invoker对象。2.将invoker对象转为proxy代理对象。

其中生成invoker对象是通过RegistryProtocol来实现的。
此处省略本地URL判断操作

  1. 执行RegistryProtocol.dorefer方法里面新建了RegistryDirectory对象。
  2. 执行RegistryDirectory.subscribe方法监听了provider、router等节点,这样就得到provider配置
  3. 执行Registry.doNotify方法触发RegistryDirectory.refreshInvoker方法
  4. 再由RegistryDirectory.toInvokers方法 调用DubboProtocol.refer方法生成invoker对象存入RegistryDirectory对象中,
  5. 并通过RegistryProtocol.dorefer方法中的CLustor对象合并invoker,暴露单个可使用对象最后
  6. 最后再由ProxyFactory生成代理对象返回。

URL解析

protocol://host:port/path?key=value&key=value
复制代码

URL主要有以下几部分组成:

  • protocol: 协议,一般像我们的 provider 或者 consumer 在这里都是人为具体的协议
  • host: 当前 provider 或者其他协议所具体针对的地址,比较特殊的像 override 协议所指定的host就是 0.0.0.0 代表所有的机器都生效
  • port: 和上面相同,代表所处理的端口号
  • path: 服务路径,在 provider 或者 consumer 等其他中代表着我们真实的业务接口
  • key=value: 这些则代表具体的参数,这里我们可以理解为对这个地址的配置。比如我们 provider中需要具体机器的服务应用名,就可以是一个配置的方式设置上去。

注意:Dubbo中的URL与java中的URL是有一些区别的,如下:

  • 这里提供了针对于参数的 parameter 的增加和减少(支持动态更改)
  • 提供缓存功能,对一些基础的数据做缓存.
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享