解析 dubbo 的服务暴露过程

出镜率非常高的 RPC 框架:dubbo,相对于 spring cloud 基于 http 协议实现的远程调用,dubbo 由于采用私有协议,使其远程调用过程具有更高的效率。

服务端启动入口 ServiceBean

我们先看一下入口 ServiceBean 的继承结构:

image.png

首先映入眼帘的肯定是他实现的一大堆接口:

implements InitializingBean,
    DisposableBean, 
    ApplicationContextAware, 
    ApplicationListener, 
    BeanNameAware
复制代码

看的这个,应该马上条件反射想到 spring bean 的生命周期,我们这里只看相关的切入点:

  • BeanNameAware

位于 Bean 初始化阶段,此时 bean 已经实例化并完成了属性赋值。

  • InitializingBean

位于 Bean 初始化阶段,在 bean 初始化前置处理器之后执行。

  • DisposableBean

位于 Bean 销毁阶段,执行 bean 的一些清理工作。

这里我只需要关注 ApplicationContextAware#setApplicationContextInitializingBean#afterPropertiesSet 方法。

Spring 上下文织入点 setApplicationContext

  1. 保存 spring 上下文引用,在 ServiceBean 后面会用到。
  2. 将 spring 上下文与 dubbo 自身容器关联。
SpringExtensionFactory.addApplicationContext(applicationContext);
复制代码
  1. 通过反射将自身设置为 spring 容器事件监听。
Method method = applicationContext.getClass().getMethod("addApplicationListener", 
    new Class<?>[]{ApplicationListener.class}); // 兼容Spring2.0.1
method.invoke(applicationContext, new Object[] {this});
复制代码

真正的服务暴露,也是通过监听到 ContextRefreshedEvent 事件来触发的。

配置解析 afterPropertiesSet

服务暴露相关的配置解析及初始化集中在 afterPropertiesSet 方法内,下面我们来分别看一下以下几个步骤:

provider 配置解析

服务提供者缺省值配置。对应的配置类: org.apache.dubbo.config.ProviderConfig。同时该标签为 dubbo:servicedubbo:protocol 标签的缺省值设置。

Map<String, ProviderConfig> providerConfigMap = applicationContext == null ? 
    null : 
    BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, 
        ProviderConfig.class, false, false);
...
复制代码

application 配置解析

应用信息配置。对应的配置类:org.apache.dubbo.config.ApplicationConfig

Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ?
    null : 
    BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, 
        ApplicationConfig.class, false, false);
...
复制代码

module 配置解析

模块信息配置。对应的配置类 org.apache.dubbo.config.ModuleConfig

Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ?
    null : 
    BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, 
        ModuleConfig.class, false, false);
...
复制代码

monitor 配置解析

监控中心配置。对应的配置类: org.apache.dubbo.config.MonitorConfig

protocol 配置解析

服务提供者协议配置。对应的配置类: org.apache.dubbo.config.ProtocolConfig。同时,如果需要支持多协议,可以声明多个 dubbo:protocol 标签,并在 dubbo:service 中通过 protocol 属性指定使用的协议。

设置服务 path

设置服务名称至 serviceConfig

 if (getPath() == null || getPath().length() == 0) {
            if (beanName != null && beanName.length() > 0 
                    && getInterface() != null && getInterface().length() > 0
                    && beanName.startsWith(getInterface())) {
                setPath(beanName);
            }
  }
复制代码

是否延迟暴露服务

如果对应服务为配置延迟暴露,则在该阶段就调用服务暴露方法 #export。

if (! isDelay()) {
    export();
}
复制代码

触发服务暴露 onApplicationEvent

在 spring Bean 的初始化阶段,已经通过反射将自己设置为 Spring event 的监听器。当收到 ContextRefreshedEvent 事件时,便会触发服务暴露。

    public void onApplicationEvent(ApplicationEvent event) {
        if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
        	if (isDelay() && ! isExported() && ! isUnexported()) {
                    ...
                    export();
                }
        }
    }
复制代码

配置获取与检查阶段

检查项包括但不限于:

  • 指定接口中方法是否存在
  • 是否配置 application:name
  • 是否配置注册中心地址registry:address
  • 设置服务暴露协议,默认为 dubbo
  • 检查桩方法和 mock 方法 – 这里主要是接口实现检查

拼接暴露服务 URL

根据服务端配置,将即将暴露的服务信息拼接成 URL 的形式。
常见的 key 有:

side=provider&version=?&timestamp=?&pid=?&methods=?&name=?&port=?...
复制代码

另外,如果配置有 monitor ,还会在 URL 中添加上 monitor 相关参数。

URL monitorUrl = loadMonitor(registryURL);
复制代码

创建/获取 invoker

Invoker<?> invoker = proxyFactory.getInvoker(ref, 
    (Class) interfaceClass, 
    registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, 
        url.toFullString()));
复制代码

dubbo 中关于 invoker 具有三种实现,分别为JavassistStubProxy 和原始的JdkProxy

Invoker 实际上就是对实际调用方法的一层封装。将所有形式的方法调用统一包装在 Invoker 中。

JdkProxyFactory
---
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
    }

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, 
                                      Class<?>[] parameterTypes, 
                                      Object[] arguments) throws Throwable {
                Method method = proxy.getClass().getMethod(methodName, parameterTypes);
                return method.invoke(proxy, arguments);
            }
        };
    }
复制代码

服务暴露

协议层 – Protocol

dubbo 实际上是支持多种协议,不过这里我们只看 dubbo 协议的实现。

String key = url.getAddress();
...
ExchangeServer server = serverMap.get(key);
if (server == null) {
    serverMap.put(key, createServer(url));
} else {
    //server支持reset,配合override功能使用
    server.reset(url);
}
复制代码

根据上面的代码可以看出,同地址内仅会执行一次 createServer。也就是说,同一个服务内的所有 service 都会通过同一个 tcp 连接暴露远程服务。

在真实开启 service 之前,还会再添加一系列服务参数:

  • channel.readonly.sent

    默认开启server关闭时发送readonly事件

  • heartbeat

    开启心跳,默认间隔为 60 * 1000

  • codec

    服务编码方式,默认为 dubbo

信息交换层 – Exchanger

完成后调用 server = Exchangers.bind(url, requestHandler); 真正开启服务。

在实际运行时,会根据 URL 中 exchanger 配置选择不同的传输类。

dubbo 提供的实现类为 HeaderExchangeServer

其使用 ScheduledExecutorService 实现了一个定时触发的心跳机制。

 heatbeatTimer = scheduled.scheduleWithFixedDelay(
     new HeartBeatTask( new HeartBeatTask.ChannelProvider() {
         public Collection<Channel> getChannels() {
             return Collections.unmodifiableCollection(
                     HeaderExchangeServer.this.getChannels() );
         }
     }, heartbeat, heartbeatTimeout),
     heartbeat, heartbeat,TimeUnit.MILLISECONDS);
复制代码

最终会调用 Transporters.bind ,进入到实际的传输层。

return new HeaderExchangeServer(Transporters.bind(url, 
    new DecodeHandler(new HeaderExchangeHandler(handler))));
复制代码

网络传输层 – Transporter

我们这里以 NettyTransporter 为例。

看一下使用 Netty 开启服务的代码:

        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);
        
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        // https://issues.jboss.org/browse/NETTY-365
        // https://issues.jboss.org/browse/NETTY-379
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
复制代码

扩展

当 nettyService 收到数据后,如何转换为方法调用

我们再看一下 dubbo 为我们创建的 ChannelPipeline

ChannelPipeline pipeline = Channels.pipeline();
...
pipeline.addLast("handler", nettyHandler);
复制代码

关键就在 nettyHandler 中。

回顾一下在信息交换层调用网络传输接口进行绑定的代码:

new HeaderExchangeServer(Transporters.bind(url, 
    new DecodeHandler(
        new HeaderExchangeHandler(handler))));
复制代码

没错,上面的 nettyHandler 实际就是 DecodeHandler -> HeaderExchangeHandler -> DubboProtocol$ExchangeHandlerAdapter

第一步先由 DecodeHandler 执行反序列化:

if (message instanceof Request) {
    decode(((Request)message).getData());
}

if (message instanceof Response) {
    decode( ((Response)message).getResult());
}

handler.received(channel, message);
复制代码

然后,再交给 HeaderExchangeHandler 做方法的 request/response 路由。

            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                handler.received(exchangeChannel, message);
            }
复制代码

最后,会进去到 DubboProtocol 中实现的 ExchangeHandlerAdapter 内部类中:

public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
    if (message instanceof Invocation) {
        ...
        Invocation inv = (Invocation) message;
        Invoker<?> invoker = getInvoker(channel, inv);
        ...
        return invoker.invoke(inv);

    }
复制代码

上面我们已经看到,invoker 就是 dubbo 对实际方法调用的封装。

所以在这里,也就是根据 serviceKey 获取到 invoker 并执行调用的过程。

String serviceKey = serviceKey(
    port, 
    path, 
    inv.getAttachments().get(Constants.VERSION_KEY), 
    inv.getAttachments().get(Constants.GROUP_KEY));
复制代码

exchange – 信息交换层中的同步转异步

我们在使用 RPC 框架进行方法调用时,虽然在调用方层面看似发生了一个同步方法调用。

但是,在 RPC 框架请求确实被异步发送出去的。当异步获得远程方法返回值后,又会转为同步返回给使用方。

代码位于 DubboInvoker#doInvoke 中。

protected Result doInvoke(final Invocation invocation) throws Throwable {
...
    if (isOneway) {
         ...
    } else if (isAsync) {
         ...
    } else {
         RpcContext.getContext().setFuture(null);
         return (Result) currentClient.request(inv, timeout).get();
    }
...
复制代码

currentClient.request 中,将本次调用交给数据传输层异步发送出去,然后调用 ResponseFuture#get 阻塞调用方线程。

while (! isDone()) {
    done.await(timeout, TimeUnit.MILLISECONDS);
    if (isDone() || System.currentTimeMillis() - start > timeout) {
        break;
    }
}
复制代码

后续,在接受到服务方返回数据后,会主动调用 done.signal() 唤醒调用方线程。

private void doReceived(Response res) {
...
    response = res;
    if (done != null) {
        done.signal();
    }
...
}
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享