Dubbo 请求全过程解析

这里debug的代码用的是github上dubbo项目的dubbo-demo里的dubbo-demo-xml下的代码。这里以默认的dubbo通信协议为debug的代码,在《Spring解析并注册Dubbo consumer端过程详解》《Spring解析并发布Dubbo provider端过程详解》中已经详细讲到Spring解析服务提供方和消费方的配置流程,这一篇看下consumer方的调用的整个过程。dubbo官网上的一篇讲dubbo的设计原则的文章,这里先贴出里面的调用链图:
在这里插入图片描述

demo里的consumer端的xml配置:

<beans 
       // xmlns:xsi是xsi标签命名空间
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       // xmlns:dubbo是dubbo标签的命名空间
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       // 当前那xml文件默认的命名空间
       xmlns="http://www.springframework.org/schema/beans"
       // xsi:schemaLocation 配置了每个命名空间对应里配置规范,用来做格式校验
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

    <dubbo:application name="demo-consumer"/>

    <dubbo:registry address="zookeeper://127.0.0.1:2181" timeout="6000"/>
    
    <!--    协议配置-->
    <dubbo:protocol name="dubbo"/>
    
    <!--    consumer配置-->
    <dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService" timeout="6000" />

</beans>
复制代码

调用入口代码:

public class Application {
    /**
     * In order to make sure multicast registry works, need to specify '-Djava.net.preferIPv4Stack=true' before
     * launch the application
     */
    public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-consumer.xml");
        context.start();
        DemoService demoService = context.getBean("demoService", DemoService.class);
        for(int i=0;i<1000;i++){
            String hello = demoService.sayHello("world");
            System.out.println("result: " + hello);
            Thread.sleep(1000L);
        }
    }
}
复制代码

《Spring解析并注册Dubbo consumer端过程详解》中讲到dubbo:reference配置会被Spring容器解析成一个FactoryBean对象,在依赖注入或者直接调用getBean方法就会触发这个FactoryBeangetObject()方法从而返回真正的代理类,先贴出getObject()方法的结合dubbo官方的分层的完整时序图:
在这里插入图片描述

图中蓝色部分是是使用包装器模式,核心的DubboInvoker被层层包装,在调用时,就是从外层包装类层层往里调用。

这里主要分三步:

  1. consumer端发送请求
  2. provider端解析并处理请求,将结果发送给consumer端
  3. consumer端接收请求结果

1、consumer端发送请求

先贴出总时序图:
在这里插入图片描述

demoService.sayHello("world")这里的demoService实际是个代理对象,在《Spring解析并注册Dubbo consumer端过程详解》中讲到这个代理类的生成过程,这里贴出代理类反编译后的代码:

public class proxy0
implements ClassGenerator.DC,
Destroyable,
EchoService,
DemoService {
    public static Method[] methods;
    private InvocationHandler handler;

    @Override
    public Object $echo(Object object) {
        Object[] objectArray = new Object[]{object};
        Object object2 = this.handler.invoke(this, methods[0], objectArray);
        return object2;
    }

    public CompletableFuture sayHelloAsync(String string) {
        Object[] objectArray = new Object[]{string};
        Object object = this.handler.invoke(this, methods[1], objectArray);
        return (CompletableFuture)object;
    }
    
    // 实际是调用InvocationHandler的invoke
    public String sayHello(String string) {
        Object[] objectArray = new Object[]{string};
        Object object = this.handler.invoke(this, methods[2], objectArray);
        return (String)object;
    }

    @Override
    public void $destroy() {
        Object[] objectArray = new Object[]{};
        Object object = this.handler.invoke(this, methods[3], objectArray);
    }

    public proxy0() {
    }

    public proxy0(InvocationHandler invocationHandler) {
        this.handler = invocationHandler;
    }
}
复制代码

可以看出我们在调用demoService.sayHello(String string)方法时,实际就是调用InvokerInvocationHandler的invoke方法,方法代码如下:

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(this.invoker, args);
        } else {
            String methodName = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (parameterTypes.length == 0) {
                if ("toString".equals(methodName)) {
                    return this.invoker.toString();
                }

                if ("$destroy".equals(methodName)) {
                    this.invoker.destroy();
                    return null;
                }

                if ("hashCode".equals(methodName)) {
                    return this.invoker.hashCode();
                }
            } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
                return this.invoker.equals(args[0]);
            }
            // 构造RpcInvocation信息,分装了接口、方法、参数等元数据信息
            RpcInvocation rpcInvocation = new RpcInvocation(method, this.invoker.getInterface().getName(), args);
            rpcInvocation.setTargetServiceUniqueName(this.invoker.getUrl().getServiceKey());
            // 调用invoker链
            return this.invoker.invoke(rpcInvocation).recreate();
        }
    }
复制代码

实际是调用了被层层包装的invoker.invoke(rpcInvocation)方法。接着上面的代码可以看出主要逻辑就是将被调用的类、方法、参数这些元数据信息封装成RpcInvocation对象,调用生成的invoker链的invoke方法,这个invoker链的生成过程也在《Spring解析并注册Dubbo consumer端过程详解》里讲过,invoker包装链如下:
在这里插入图片描述

最终会调用到FailoverClusterInvoker.invoke(Invocation invocation)方法里,代码如下:

public Result invoke(Invocation invocation) throws RpcException {
        this.checkWhetherDestroyed();
        Map<String, Object> contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            ((RpcInvocation)invocation).addAttachments(contextAttachments);
        }
        // 1 通过注册中心对象获取适合的服务提供方
        List<Invoker<T>> invokers = this.list(invocation);
        // 2 根据负载均衡策略,从多个服务提供方中选择一个
        LoadBalance loadbalance = this.initLoadBalance(invokers, invocation);
        RpcUtils.attachInvocationIdIfAsync(this.getUrl(), invocation);
        // 3 调用最终选中的invoker
        return this.doInvoke(invocation, invokers, loadbalance);
    }
复制代码

这里分关键的三步:

  1. 通过注册中心对象获取适合的服务提供方
  2. 根据负载均衡策略,从多个服务提供方中选择一个
  3. 调用最终选中的invoker

1.1 通过注册中心对象获取符合路由策略的服务提供方列表

List<Invoker<T>> invokers = this.list(invocation);

复制代码

最终会调用RegistryDirectory.doList(Invocation invocation)方法,主干代码如下:

public List<Invoker<T>> doList(Invocation invocation) {
        ...
            List invokers = null;

            try {
            // 通过路由链来过滤
                invokers = this.routerChain.route(this.getConsumerUrl(), invocation);
            } catch (Throwable var4) {
                logger.error("Failed to execute router: " + this.getUrl() + ", cause: " + var4.getMessage(), var4);
            }

            return invokers == null ? Collections.emptyList() : invokers;
      ...
    }
复制代码

这里需要注意的一点是,这里的服务提供方及路由配置信息都是由注册发现对象RegistryDirectory本地缓存好的,而更新动作是通过注册中心的监听回调来触发的(详见《Spring解析并注册Dubbo consumer端过程详解》)。

1.2 根据负载均衡策略选取一个服务提供者进行调用

protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
        return CollectionUtils.isNotEmpty(invokers) ? (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(((Invoker)invokers.get(0)).getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), "loadbalance", "random")) : (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("random");
    }
复制代码

这里是通过SPI扩展机制获取所有的负载均衡策略,然后检查URL中是否有针对当前方法指定负载策略,如果有则获取对应的扩展实现类,负责使用默认的随机负载,关于SPI机制可以看这里

private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
        if (CollectionUtils.isEmpty(invokers)) {
            return null;
        } else if (invokers.size() == 1) {
            return (Invoker)invokers.get(0);
        } else {
        // 根据负载策略选取一个服务提供者
            Invoker<T> invoker = loadbalance.select(invokers, this.getUrl(), invocation);
            if (selected != null && selected.contains(invoker) || !invoker.isAvailable() && this.getUrl() != null && this.availablecheck) {
                try {
                    Invoker<T> rInvoker = this.reselect(loadbalance, invocation, invokers, selected, this.availablecheck);
                    if (rInvoker != null) {
                        invoker = rInvoker;
                    } else {
                        int index = invokers.indexOf(invoker);

                        try {
                            invoker = (Invoker)invokers.get((index + 1) % invokers.size());
                        } catch (Exception var9) {
                            logger.warn(var9.getMessage() + " may because invokers list dynamic change, ignore.", var9);
                        }
                    }
                } catch (Throwable var10) {
                    logger.error("cluster reselect fail reason is :" + var10.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", var10);
                }
            }

            return invoker;
        }
    }
复制代码

《Spring解析并注册Dubbo consumer端过程详解》)中关于服务发现的逻辑里可以看到,当注册中心的服务提供方节点发生变化时,会通知服务发现对象来变更本地维护的invoker列表,这里会将invoker做层层包装,每一层包装都会添加新的特性,那么在调用时,也会层层调用。

1.2.1 组装Request对象,设置request的id

设置request的id是为了异步调用,这样一个请求发出后,不用阻塞下一个请求,并且返回的response也会带上这个id,从而通过id来匹配返回的response是哪个请求发出的,这样能提高吞吐量。核心代码在HeaderExchangeChannel的request方法里,代码如下:

public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        if (this.closed) {
            throw new RemotingException(this.getLocalAddress(), (InetSocketAddress)null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        } else {
            // 设置id
            Request req = new Request();
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay(true);
            req.setData(request);
            // 设置异步结果,并将结果缓存到DefaultFuture的全局map中
            DefaultFuture future = DefaultFuture.newFuture(this.channel, req, timeout, executor);

            try {
            // 发送请求
                this.channel.send(req);
                // 返回异步结果
                return future;
            } catch (RemotingException var7) {
                future.cancel();
                throw var7;
            }
        }
    }
复制代码

1.2.2 封装异步结果DefaultFuture,并将异步结果缓存到全局的Map中以便后续唤醒阻塞等待结果的consumer线程

接着上面的代码,在DefaultFuture future = DefaultFuture.newFuture(this.channel, req, timeout, executor);中会将当前异步结果缓存到全局的Map中,代码在DefaultFuture的构造器内,源码如下:


// 全局Map
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap();

public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
    // 调用构造器,在构造器里会将DefaultFuture缓存到全局Map中
    DefaultFuture future = new DefaultFuture(channel, request, timeout);
    // 将线程池对象封装到DefaultFuture对象中
    future.setExecutor(executor);
    timeoutCheck(future);
    return future;
}

// 构造器方法
private DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter("timeout", 1000);
        // 以Id为key,将结果缓存到全局的静态Map中
        FUTURES.put(this.id, this);
        CHANNELS.put(this.id, channel);
    }
    
  
复制代码

在后续接收到provider结果后,会通过Response的id在这个全局Map找到对应的DefaultFuture,从而找到封装到DefaultFuture对象里的线程池对象executor,往线程池的阻塞队里中提交Runnable,唤醒被AsyncRpcResult.get()阻塞的consumer线程。

1.2.3 调用NettyClient发送Request,返回异步结果AsyncRpcResult

DubboInvoker.doInvoke(Invocation invocation)代码如下:

protected Result doInvoke(Invocation invocation) throws Throwable {
        // 封装请求元数据信息
        RpcInvocation inv = (RpcInvocation)invocation;
        String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment("path", this.getUrl().getPath());
        inv.setAttachment("version", this.version);
        ExchangeClient currentClient;
        if (this.clients.length == 1) {
            currentClient = this.clients[0];
        } else {
            currentClient = this.clients[this.index.getAndIncrement() % this.clients.length];
        }

        try {
            boolean isOneway = RpcUtils.isOneway(this.getUrl(), invocation);
            int timeout = this.getUrl().getMethodPositiveParameter(methodName, "timeout", 1000);
            if (isOneway) {
                boolean isSent = this.getUrl().getMethodParameter(methodName, "sent", false);
                currentClient.send(inv, isSent);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                ExecutorService executor = this.getCallbackExecutor(this.getUrl(), inv);
                // 通过NettyClient发送请求
                CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv, timeout, executor).thenApply((obj) -> {
                    return (AppResponse)obj;
                });
                FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                result.setExecutor(executor);
                return result;
            }
        } catch (TimeoutException var10) {
            throw new RpcException(2, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var10.getMessage(), var10);
        } catch (RemotingException var11) {
            throw new RpcException(1, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var11.getMessage(), var11);
        }
    }
复制代码

通过ExchangeClient来发送请求,最终是通过NettyClient发送请求,时序图如下:
在这里插入图片描述

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享