这里debug的代码用的是github上dubbo项目的dubbo-demo里的dubbo-demo-xml下的代码。这里以默认的dubbo通信协议为debug的代码,在《Spring解析并注册Dubbo consumer端过程详解》和《Spring解析并发布Dubbo provider端过程详解》中已经详细讲到Spring解析服务提供方和消费方的配置流程,这一篇看下consumer方的调用的整个过程。dubbo官网上的一篇讲dubbo的设计原则的文章,这里先贴出里面的调用链图:
demo里的consumer端的xml配置:
<beans
// xmlns:xsi是xsi标签命名空间
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
// xmlns:dubbo是dubbo标签的命名空间
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
// 当前那xml文件默认的命名空间
xmlns="http://www.springframework.org/schema/beans"
// xsi:schemaLocation 配置了每个命名空间对应里配置规范,用来做格式校验
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
<dubbo:application name="demo-consumer"/>
<dubbo:registry address="zookeeper://127.0.0.1:2181" timeout="6000"/>
<!-- 协议配置-->
<dubbo:protocol name="dubbo"/>
<!-- consumer配置-->
<dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService" timeout="6000" />
</beans>
复制代码
调用入口代码:
public class Application {
/**
* In order to make sure multicast registry works, need to specify '-Djava.net.preferIPv4Stack=true' before
* launch the application
*/
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-consumer.xml");
context.start();
DemoService demoService = context.getBean("demoService", DemoService.class);
for(int i=0;i<1000;i++){
String hello = demoService.sayHello("world");
System.out.println("result: " + hello);
Thread.sleep(1000L);
}
}
}
复制代码
在《Spring解析并注册Dubbo consumer端过程详解》中讲到dubbo:reference
配置会被Spring容器解析成一个FactoryBean
对象,在依赖注入或者直接调用getBean
方法就会触发这个FactoryBean
的getObject()
方法从而返回真正的代理类,先贴出getObject()
方法的结合dubbo官方的分层的完整时序图:
图中蓝色部分是是使用包装器模式,核心的DubboInvoker
被层层包装,在调用时,就是从外层包装类层层往里调用。
这里主要分三步:
- consumer端发送请求
- provider端解析并处理请求,将结果发送给consumer端
- consumer端接收请求结果
1、consumer端发送请求
先贴出总时序图:
demoService.sayHello("world")
这里的demoService
实际是个代理对象,在《Spring解析并注册Dubbo consumer端过程详解》中讲到这个代理类的生成过程,这里贴出代理类反编译后的代码:
public class proxy0
implements ClassGenerator.DC,
Destroyable,
EchoService,
DemoService {
public static Method[] methods;
private InvocationHandler handler;
@Override
public Object $echo(Object object) {
Object[] objectArray = new Object[]{object};
Object object2 = this.handler.invoke(this, methods[0], objectArray);
return object2;
}
public CompletableFuture sayHelloAsync(String string) {
Object[] objectArray = new Object[]{string};
Object object = this.handler.invoke(this, methods[1], objectArray);
return (CompletableFuture)object;
}
// 实际是调用InvocationHandler的invoke
public String sayHello(String string) {
Object[] objectArray = new Object[]{string};
Object object = this.handler.invoke(this, methods[2], objectArray);
return (String)object;
}
@Override
public void $destroy() {
Object[] objectArray = new Object[]{};
Object object = this.handler.invoke(this, methods[3], objectArray);
}
public proxy0() {
}
public proxy0(InvocationHandler invocationHandler) {
this.handler = invocationHandler;
}
}
复制代码
可以看出我们在调用demoService.sayHello(String string)
方法时,实际就是调用InvokerInvocationHandler
的invoke方法,方法代码如下:
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getDeclaringClass() == Object.class) {
return method.invoke(this.invoker, args);
} else {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 0) {
if ("toString".equals(methodName)) {
return this.invoker.toString();
}
if ("$destroy".equals(methodName)) {
this.invoker.destroy();
return null;
}
if ("hashCode".equals(methodName)) {
return this.invoker.hashCode();
}
} else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
return this.invoker.equals(args[0]);
}
// 构造RpcInvocation信息,分装了接口、方法、参数等元数据信息
RpcInvocation rpcInvocation = new RpcInvocation(method, this.invoker.getInterface().getName(), args);
rpcInvocation.setTargetServiceUniqueName(this.invoker.getUrl().getServiceKey());
// 调用invoker链
return this.invoker.invoke(rpcInvocation).recreate();
}
}
复制代码
实际是调用了被层层包装的invoker.invoke(rpcInvocation)
方法。接着上面的代码可以看出主要逻辑就是将被调用的类、方法、参数这些元数据信息封装成RpcInvocation
对象,调用生成的invoker
链的invoke
方法,这个invoker链的生成过程也在《Spring解析并注册Dubbo consumer端过程详解》里讲过,invoker包装链如下:
最终会调用到FailoverClusterInvoker.invoke(Invocation invocation)
方法里,代码如下:
public Result invoke(Invocation invocation) throws RpcException {
this.checkWhetherDestroyed();
Map<String, Object> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation)invocation).addAttachments(contextAttachments);
}
// 1 通过注册中心对象获取适合的服务提供方
List<Invoker<T>> invokers = this.list(invocation);
// 2 根据负载均衡策略,从多个服务提供方中选择一个
LoadBalance loadbalance = this.initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(this.getUrl(), invocation);
// 3 调用最终选中的invoker
return this.doInvoke(invocation, invokers, loadbalance);
}
复制代码
这里分关键的三步:
- 通过注册中心对象获取适合的服务提供方
- 根据负载均衡策略,从多个服务提供方中选择一个
- 调用最终选中的invoker
1.1 通过注册中心对象获取符合路由策略的服务提供方列表
List<Invoker<T>> invokers = this.list(invocation);
复制代码
最终会调用RegistryDirectory.doList(Invocation invocation)
方法,主干代码如下:
public List<Invoker<T>> doList(Invocation invocation) {
...
List invokers = null;
try {
// 通过路由链来过滤
invokers = this.routerChain.route(this.getConsumerUrl(), invocation);
} catch (Throwable var4) {
logger.error("Failed to execute router: " + this.getUrl() + ", cause: " + var4.getMessage(), var4);
}
return invokers == null ? Collections.emptyList() : invokers;
...
}
复制代码
这里需要注意的一点是,这里的服务提供方及路由配置信息都是由注册发现对象RegistryDirectory
本地缓存好的,而更新动作是通过注册中心的监听回调来触发的(详见《Spring解析并注册Dubbo consumer端过程详解》)。
1.2 根据负载均衡策略选取一个服务提供者进行调用
protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
return CollectionUtils.isNotEmpty(invokers) ? (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(((Invoker)invokers.get(0)).getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), "loadbalance", "random")) : (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("random");
}
复制代码
这里是通过SPI扩展机制获取所有的负载均衡策略,然后检查URL中是否有针对当前方法指定负载策略,如果有则获取对应的扩展实现类,负责使用默认的随机负载,关于SPI机制可以看这里
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (CollectionUtils.isEmpty(invokers)) {
return null;
} else if (invokers.size() == 1) {
return (Invoker)invokers.get(0);
} else {
// 根据负载策略选取一个服务提供者
Invoker<T> invoker = loadbalance.select(invokers, this.getUrl(), invocation);
if (selected != null && selected.contains(invoker) || !invoker.isAvailable() && this.getUrl() != null && this.availablecheck) {
try {
Invoker<T> rInvoker = this.reselect(loadbalance, invocation, invokers, selected, this.availablecheck);
if (rInvoker != null) {
invoker = rInvoker;
} else {
int index = invokers.indexOf(invoker);
try {
invoker = (Invoker)invokers.get((index + 1) % invokers.size());
} catch (Exception var9) {
logger.warn(var9.getMessage() + " may because invokers list dynamic change, ignore.", var9);
}
}
} catch (Throwable var10) {
logger.error("cluster reselect fail reason is :" + var10.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", var10);
}
}
return invoker;
}
}
复制代码
在《Spring解析并注册Dubbo consumer端过程详解》)中关于服务发现的逻辑里可以看到,当注册中心的服务提供方节点发生变化时,会通知服务发现对象来变更本地维护的invoker列表,这里会将invoker做层层包装,每一层包装都会添加新的特性,那么在调用时,也会层层调用。
1.2.1 组装Request对象,设置request的id
设置request的id是为了异步调用,这样一个请求发出后,不用阻塞下一个请求,并且返回的response也会带上这个id,从而通过id来匹配返回的response是哪个请求发出的,这样能提高吞吐量。核心代码在HeaderExchangeChannel
的request方法里,代码如下:
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
if (this.closed) {
throw new RemotingException(this.getLocalAddress(), (InetSocketAddress)null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
} else {
// 设置id
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
// 设置异步结果,并将结果缓存到DefaultFuture的全局map中
DefaultFuture future = DefaultFuture.newFuture(this.channel, req, timeout, executor);
try {
// 发送请求
this.channel.send(req);
// 返回异步结果
return future;
} catch (RemotingException var7) {
future.cancel();
throw var7;
}
}
}
复制代码
1.2.2 封装异步结果DefaultFuture
,并将异步结果缓存到全局的Map中以便后续唤醒阻塞等待结果的consumer线程
接着上面的代码,在DefaultFuture future = DefaultFuture.newFuture(this.channel, req, timeout, executor);
中会将当前异步结果缓存到全局的Map中,代码在DefaultFuture
的构造器内,源码如下:
// 全局Map
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap();
public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
// 调用构造器,在构造器里会将DefaultFuture缓存到全局Map中
DefaultFuture future = new DefaultFuture(channel, request, timeout);
// 将线程池对象封装到DefaultFuture对象中
future.setExecutor(executor);
timeoutCheck(future);
return future;
}
// 构造器方法
private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter("timeout", 1000);
// 以Id为key,将结果缓存到全局的静态Map中
FUTURES.put(this.id, this);
CHANNELS.put(this.id, channel);
}
复制代码
在后续接收到provider结果后,会通过Response
的id在这个全局Map找到对应的DefaultFuture
,从而找到封装到DefaultFuture
对象里的线程池对象executor
,往线程池的阻塞队里中提交Runnable,唤醒被AsyncRpcResult.get()
阻塞的consumer线程。
1.2.3 调用NettyClient发送Request,返回异步结果AsyncRpcResult
DubboInvoker.doInvoke(Invocation invocation)
代码如下:
protected Result doInvoke(Invocation invocation) throws Throwable {
// 封装请求元数据信息
RpcInvocation inv = (RpcInvocation)invocation;
String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment("path", this.getUrl().getPath());
inv.setAttachment("version", this.version);
ExchangeClient currentClient;
if (this.clients.length == 1) {
currentClient = this.clients[0];
} else {
currentClient = this.clients[this.index.getAndIncrement() % this.clients.length];
}
try {
boolean isOneway = RpcUtils.isOneway(this.getUrl(), invocation);
int timeout = this.getUrl().getMethodPositiveParameter(methodName, "timeout", 1000);
if (isOneway) {
boolean isSent = this.getUrl().getMethodParameter(methodName, "sent", false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
ExecutorService executor = this.getCallbackExecutor(this.getUrl(), inv);
// 通过NettyClient发送请求
CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv, timeout, executor).thenApply((obj) -> {
return (AppResponse)obj;
});
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException var10) {
throw new RpcException(2, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var10.getMessage(), var10);
} catch (RemotingException var11) {
throw new RpcException(1, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var11.getMessage(), var11);
}
}
复制代码
通过ExchangeClient
来发送请求,最终是通过NettyClient发送请求,时序图如下: