出镜率非常高的 RPC 框架:dubbo,相对于 spring cloud 基于 http 协议实现的远程调用,dubbo 由于采用私有协议,使其远程调用过程具有更高的效率。
服务端启动入口 ServiceBean
我们先看一下入口 ServiceBean 的继承结构:
首先映入眼帘的肯定是他实现的一大堆接口:
implements InitializingBean,
DisposableBean,
ApplicationContextAware,
ApplicationListener,
BeanNameAware
复制代码
看的这个,应该马上条件反射想到 spring bean 的生命周期,我们这里只看相关的切入点:
- BeanNameAware
位于 Bean 初始化阶段,此时 bean 已经实例化并完成了属性赋值。
- InitializingBean
位于 Bean 初始化阶段,在 bean 初始化前置处理器之后执行。
- DisposableBean
位于 Bean 销毁阶段,执行 bean 的一些清理工作。
这里我只需要关注 ApplicationContextAware#setApplicationContext
和 InitializingBean#afterPropertiesSet
方法。
Spring 上下文织入点 setApplicationContext
- 保存 spring 上下文引用,在
ServiceBean
后面会用到。 - 将 spring 上下文与 dubbo 自身容器关联。
SpringExtensionFactory.addApplicationContext(applicationContext);
复制代码
- 通过反射将自身设置为 spring 容器事件监听。
Method method = applicationContext.getClass().getMethod("addApplicationListener",
new Class<?>[]{ApplicationListener.class}); // 兼容Spring2.0.1
method.invoke(applicationContext, new Object[] {this});
复制代码
真正的服务暴露,也是通过监听到 ContextRefreshedEvent
事件来触发的。
配置解析 afterPropertiesSet
服务暴露相关的配置解析及初始化集中在 afterPropertiesSet
方法内,下面我们来分别看一下以下几个步骤:
provider 配置解析
服务提供者缺省值配置。对应的配置类: org.apache.dubbo.config.ProviderConfig。同时该标签为 dubbo:service 和 dubbo:protocol 标签的缺省值设置。
Map<String, ProviderConfig> providerConfigMap = applicationContext == null ?
null :
BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext,
ProviderConfig.class, false, false);
...
复制代码
application 配置解析
应用信息配置。对应的配置类:org.apache.dubbo.config.ApplicationConfig
Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ?
null :
BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext,
ApplicationConfig.class, false, false);
...
复制代码
module 配置解析
模块信息配置。对应的配置类 org.apache.dubbo.config.ModuleConfig
Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ?
null :
BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext,
ModuleConfig.class, false, false);
...
复制代码
monitor 配置解析
监控中心配置。对应的配置类: org.apache.dubbo.config.MonitorConfig
protocol 配置解析
服务提供者协议配置。对应的配置类: org.apache.dubbo.config.ProtocolConfig。同时,如果需要支持多协议,可以声明多个 dubbo:protocol 标签,并在 dubbo:service 中通过 protocol 属性指定使用的协议。
设置服务 path
设置服务名称至 serviceConfig
if (getPath() == null || getPath().length() == 0) {
if (beanName != null && beanName.length() > 0
&& getInterface() != null && getInterface().length() > 0
&& beanName.startsWith(getInterface())) {
setPath(beanName);
}
}
复制代码
是否延迟暴露服务
如果对应服务为配置延迟暴露,则在该阶段就调用服务暴露方法 #export。
if (! isDelay()) {
export();
}
复制代码
触发服务暴露 onApplicationEvent
在 spring Bean 的初始化阶段,已经通过反射将自己设置为 Spring event 的监听器。当收到 ContextRefreshedEvent
事件时,便会触发服务暴露。
public void onApplicationEvent(ApplicationEvent event) {
if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
if (isDelay() && ! isExported() && ! isUnexported()) {
...
export();
}
}
}
复制代码
配置获取与检查阶段
检查项包括但不限于:
- 指定接口中方法是否存在
- 是否配置
application:name
- 是否配置注册中心地址
registry:address
- 设置服务暴露协议,默认为
dubbo
- 检查桩方法和 mock 方法 – 这里主要是接口实现检查
拼接暴露服务 URL
根据服务端配置,将即将暴露的服务信息拼接成 URL 的形式。
常见的 key 有:
side=provider&version=?×tamp=?&pid=?&methods=?&name=?&port=?...
复制代码
另外,如果配置有 monitor ,还会在 URL 中添加上 monitor 相关参数。
URL monitorUrl = loadMonitor(registryURL);
复制代码
创建/获取 invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref,
(Class) interfaceClass,
registryURL.addParameterAndEncoded(Constants.EXPORT_KEY,
url.toFullString()));
复制代码
dubbo 中关于 invoker
具有三种实现,分别为Javassist
,StubProxy
和原始的JdkProxy
。
Invoker
实际上就是对实际调用方法的一层封装。将所有形式的方法调用统一包装在 Invoker 中。
JdkProxyFactory
---
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
}
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
Method method = proxy.getClass().getMethod(methodName, parameterTypes);
return method.invoke(proxy, arguments);
}
};
}
复制代码
服务暴露
协议层 – Protocol
dubbo 实际上是支持多种协议,不过这里我们只看 dubbo 协议的实现。
String key = url.getAddress();
...
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
//server支持reset,配合override功能使用
server.reset(url);
}
复制代码
根据上面的代码可以看出,同地址内仅会执行一次 createServer
。也就是说,同一个服务内的所有 service 都会通过同一个 tcp 连接暴露远程服务。
在真实开启 service 之前,还会再添加一系列服务参数:
-
channel.readonly.sent
默认开启server关闭时发送readonly事件
-
heartbeat
开启心跳,默认间隔为 60 * 1000
-
codec
服务编码方式,默认为 dubbo
信息交换层 – Exchanger
完成后调用 server = Exchangers.bind(url, requestHandler);
真正开启服务。
在实际运行时,会根据 URL 中 exchanger
配置选择不同的传输类。
dubbo 提供的实现类为 HeaderExchangeServer
。
其使用 ScheduledExecutorService
实现了一个定时触发的心跳机制。
heatbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask( new HeartBeatTask.ChannelProvider() {
public Collection<Channel> getChannels() {
return Collections.unmodifiableCollection(
HeaderExchangeServer.this.getChannels() );
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat,TimeUnit.MILLISECONDS);
复制代码
最终会调用 Transporters.bind
,进入到实际的传输层。
return new HeaderExchangeServer(Transporters.bind(url,
new DecodeHandler(new HeaderExchangeHandler(handler))));
复制代码
网络传输层 – Transporter
我们这里以 NettyTransporter
为例。
看一下使用 Netty 开启服务的代码:
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
复制代码
扩展
当 nettyService 收到数据后,如何转换为方法调用
我们再看一下 dubbo 为我们创建的 ChannelPipeline
。
ChannelPipeline pipeline = Channels.pipeline();
...
pipeline.addLast("handler", nettyHandler);
复制代码
关键就在 nettyHandler
中。
回顾一下在信息交换层调用网络传输接口进行绑定的代码:
new HeaderExchangeServer(Transporters.bind(url,
new DecodeHandler(
new HeaderExchangeHandler(handler))));
复制代码
没错,上面的 nettyHandler
实际就是 DecodeHandler
-> HeaderExchangeHandler
-> DubboProtocol$ExchangeHandlerAdapter
。
第一步先由 DecodeHandler
执行反序列化:
if (message instanceof Request) {
decode(((Request)message).getData());
}
if (message instanceof Response) {
decode( ((Response)message).getResult());
}
handler.received(channel, message);
复制代码
然后,再交给 HeaderExchangeHandler
做方法的 request/response 路由。
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
复制代码
最后,会进去到 DubboProtocol
中实现的 ExchangeHandlerAdapter
内部类中:
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
...
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
...
return invoker.invoke(inv);
}
复制代码
上面我们已经看到,invoker
就是 dubbo 对实际方法调用的封装。
所以在这里,也就是根据 serviceKey
获取到 invoker
并执行调用的过程。
String serviceKey = serviceKey(
port,
path,
inv.getAttachments().get(Constants.VERSION_KEY),
inv.getAttachments().get(Constants.GROUP_KEY));
复制代码
exchange – 信息交换层中的同步转异步
我们在使用 RPC 框架进行方法调用时,虽然在调用方层面看似发生了一个同步方法调用。
但是,在 RPC 框架请求确实被异步发送出去的。当异步获得远程方法返回值后,又会转为同步返回给使用方。
代码位于 DubboInvoker#doInvoke
中。
protected Result doInvoke(final Invocation invocation) throws Throwable {
...
if (isOneway) {
...
} else if (isAsync) {
...
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
...
复制代码
在 currentClient.request
中,将本次调用交给数据传输层异步发送出去,然后调用 ResponseFuture#get
阻塞调用方线程。
while (! isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
复制代码
后续,在接受到服务方返回数据后,会主动调用 done.signal()
唤醒调用方线程。
private void doReceived(Response res) {
...
response = res;
if (done != null) {
done.signal();
}
...
}
复制代码