导读
原创文章,转载请注明出处。
本文源码地址:netty-source-code-analysis
本文所使用的netty版本4.1.6.Final:带注释的netty源码
在“BIO vs NIO”这篇文章中我们给出了使用jdk原生nio编写的客户端Hello World。还记得其中的关键步骤吗,咱们再来温习一下。
-
创建一个
SocketChannel
-
连接到服务方端口
-
将
SocketChannel
设置为非阻塞的 -
将
SocketChannel
注册到selector
上
今天我们就以这几个关键步骤为目标来看一下在netty中是怎么做的,以及在这几个步骤的中间netty又多做了哪些工作。
1 客户端引导代码
以下代码引导启动一个客户端,在本文以下内容中我们以“引导代码”指代这段程序。
- 创建一个
EventLoopGroup
,与服务端不同的是,这里不需要创建bossGroup
,因为客户端不需要处理新连接的接入,所以这里的eventLoopGroup
的作用相当于服务端的workerGroup
。 - 创建一个
BootStrap
并将eventLoopGroup
传入,设置channel
为NioSocketChannel
(对应jdkSocketChannel
)。 - 设置一个
Channel
参数和一个Channel
属性。 - 配置一个
handler
,这个handler
里我们什么也没做,仅仅是打印一些事件日志。 - 调用
bootstrap.connect
方法连接到服务端。
运行这段程序,将在控制台打印出如下结果:
HandlerAdded
ChannelRegistered
ChannelActive
/**
* 欢迎关注公众号“种代码“,获取博主微信深入交流
*
* @author wangjianxin
*/
public class com.zhongdaima.netty.analysis.bootstrap.ClientBoot {
public static void main(String[] args) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
//设置Channel参数
.option(ChannelOption.TCP_NODELAY, true)
//设置Channel属性
.attr(AttributeKey.valueOf("ChannelName"), "ClientChannel")
.handler(new ChannelInboundHandlerAdapter(){
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("ChannelRegistered");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("ChannelActive");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("HandlerAdded");
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",8000);
Channel channel = channelFuture.syncUninterruptibly().channel();
channel.closeFuture().awaitUninterruptibly();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
复制代码
2 启动过程
我们从bootstrap.connect("127.0.0.1",8000)
跟进去,这里的代码很简单,就是将host
和port
封装成InetSocketAddress
,接着调用doResolveAndConnect
方法。
public ChannelFuture connect(String inetHost, int inetPort) {
return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
}
public ChannelFuture connect(SocketAddress remoteAddress) {
return doResolveAndConnect(remoteAddress, config.localAddress());
}
复制代码
我们来看一下oResolveAndConnect
方法,这里的关键逻辑是调用initAndRegister
和doResolveAndConnect0
方法。
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
//关键逻辑
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
//关键逻辑
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
//关键逻辑
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
复制代码
initAndRegister
方法内有关键的3步。1是通过channelFactory.newChannel()
创建一个Channel
,此处的chnnelFactory
的赋值咱们已经在“服务端启动流程”分析过,这里不再赘述;2是init(channel)
,这里init
方法的实现在Boostrap
类中;3是config().group().register(channel)
。
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
}
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}
复制代码
接下来咱们重点分析initAndRegister
方法内的这3个关键步骤和doResolveAndConnect0
方法。
2.1 新创建一个NioSocketChannel
channelFactory.newChannel()
调用Channel
实现类的无参构造方法创建实例,此处的实现类为NioSocketChannel
,咱们跟到该类的无参构造方法。这里与服务端启动时所使用的NioServerSocketChannel
不同的是NioServerSocketChannel
是调用provider.openServerSocketChannel()
创建一个jdk的ServerSocketChannel
,而客户端是调用provider.openSocketChannel()
创建一个jdk的SocketChannel
。到这里,我们看到了导读中提到的第1步“创建一个SocketChannel”。
最后调用到父类AbstractNioByteChannel
的构造方法。
public NioSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
private static SocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
public NioSocketChannel(SocketChannel socket) {
this(null, socket);
}
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
复制代码
我们来看一下AbstractNioByteChannel
的构造方法,很简单,继续调用父类AbstractNioChannel
的构造方法。
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
复制代码
AbstractNioChannel
的构造方法内调用了ch.configureBlocking(false)
将Channel
设置为非阻塞的,并继续调用了父类AbstractChannel
的构造方法。到这里我们看到了导读中提到的第3步“将SocketChannel设置为非阻塞的”。
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
//设置为非阻塞的
ch.configureBlocking(false);
} catch (IOException e) {
}
}
复制代码
AbstractChannel
的构造方法里为Channel
分配了一个id,创建了一个Unsafe
和一个PipeLine
,Unsafe
和PipeLine
咱们后面再讲。
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
复制代码
2.2 初始化Channel
init
方法的实现在Boostrap
类中,这里将我们在引导代码所设置的handler
添加到pipeline
中,再为Channel
设置一些参数(引导代码中的option(ChannelOption.TCP_NODELAY, true)
)和属性(引导代码中的attr(AttributeKey.valueOf("ChannelName"), "ClientChannel")
)。
@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
//添加引导代码中所设置的handler
p.addLast(config.handler());
//设置Channel参数
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
try {
if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + channel, t);
}
}
}
//设置Channel属性
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
复制代码
2.3 注册Channel
我们回到AbstractBootstrap
的initAndRegister
方法,接着往下看到ChannelFuture regFuture = config().group().register(channel)
,这里就是注册Channel的地方了,咱们跟进去看看。
config.group()
的返回是我们在引导代码中所设置的eventLoopGroup
。
跟到register(channel)
方法里看看,这个register
方法是抽象的,具体实现在MultithreadEventLoopGroup
中,跟进去。
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
复制代码
next()
方法调用EventExecutorChooser
的next()
方法选择一个EventLoop
。EventExecutorChooser
有两个实现,分别是PowerOfTowEventExecutorChooser
和GenericEventExecutorChooser
,这两个Chooser
用的都是轮询策略,只是轮询算法不一样。如果EventLoopGroup
内的EventLoop
个数是2的幂,则用PowerOfTowEventExecutorChooser
,否则用GenericEventExecutorChooser
。
PowerOfTowEventExecutorChooser
使用位操作。
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
复制代码
而GenericEventExecutorChooser
使用取余操作。
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
复制代码
从EventLoop
的选择算法上我们可以看出,netty为了性能,无所不用其极。
chooser
属性的赋值在MultithreadEventExecutorGroup
的构造方法内通过chooserFactory
创建的。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
chooser = chooserFactory.newChooser(children);
}
复制代码
而chooserFactory
的赋值在MultithreadEventExecutorGroup
的另一个构造方法内。当我们在引导代码中通过new NioEventLoopGroup(1)
创建EventLoopGroup
时最终会调用到这个构造方法内,默认值为DefaultEventExecutorChooserFactory.INSTANCE
。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
复制代码
next()
方法选出的EventLoop
就是个SingleThreadEventLoop
了,我们跟到SingleThreadEventLoop
的register
方法,最终调用的是unsafe
的register
方法。
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
复制代码
unsafe.register
方法在io.netty.channel.AbstractChannel.AbstractUnsafe
内,我们跟下去看看。在register
方法中最主要的有两件事,一是绑定eventloop
,二是调用register0
方法。此时的调用线程不是EventLoop
线程,会发起一个异步任务。
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//绑定eventloop
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
//此时我们不在EventLoop内,也就是当前线程非EventLoop线程,会走到这个分支
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
//调用子类的register0方法
register0(promise);
}
});
} catch (Throwable t) {
}
}
}
复制代码
register0
方法内主要有3步操作。
第1步是doRegister()
,这个咱们稍后说。
第2步是pipeline.invokeHandlerAddedIfNeeded()
这一步是去完成那些在绑定EventLoop
之前触发的添加handler
操作,比如我们添加了一个ChannelInitializer
,在ChannelInitalizer
的initChannel
方法中添加的Handler
,而initChannel
被channelAdded
方法调用,channelAdded
方法的调用必须在EventLoop
内,未绑定EventLoop
之前这个调用会被封装成异步任务。
这些操作被放在pipeline
中的pendingHandlerCallbackHead
中,是个双向链表,具体请参考DefaultChannelPipeLine
的addLast(EventExecutorGroup group, String name, ChannelHandler handler)
方法。
这一步调用了咱们的引导程序中的 System.out.println("HandlerAdded")
,在控制台打出"HandlerAdded"
。
第3步触发ChannelRegistered
事件。这一步调用了咱们的引导程序中的 System.out.println("ChannelRegistered")
,在控制台打出"ChannelRegistered"
。
好了,到这里我们已经知道了,为什么我们的引导程会先打出"HandlerAdded"
和"ChannelRegistered"
。
接着往下isActive()
最终调用是的jdk SocketChannel
类的isOpen()
方法和isConnected
方法,咱们不再贴出代码,读者自行查看,很简单,显然这里我们还没有完成连接建立,所以这个if
分支的代码并不会执行。
private void register0(ChannelPromise promise) {
try {
//向Selector注册Channel
doRegister();
//去完成那些在绑定EventLoop之前触发的添加handler操作,这些操作被放在pipeline中的pendingHandlerCallbackHead中,是个链表,具体请参考`DefaultChannelPipeLine`的`addLast(EventExecutorGroup group, String name, ChannelHandler handler)`方法。
pipeline.invokeHandlerAddedIfNeeded();
//将promise设置为成功的
safeSetSuccess(promise);
//触发ChannelRegistered事件
pipeline.fireChannelRegistered();
//这里并没有Active,因为此时还没建立连接
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
}
}
复制代码
接下来咱们跟进去doRegister
方法,这是个抽象方法,本例中方法实现在AbstractNioChannel
中。好了,到这里我们终于看到了导读中提到的第4步“向Selector
注册Channel
”的操作。
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
}
}
}
复制代码
到了这里,我们在导读中说的总共4步操作中,还有第2步没有看到,在哪里呢,接着往下看。
2.4 连接到服务端
我们回到Bootstrap
类的doResolveAndConnect
方法,我们已经分析完了initAndRegister()
,因为initAndRegister
是异步的,返回结果是Future
,此时Future
有可能已经完成,也可能没有完成,这里对结果做了判断。
如果regFuture
已经完成,则直接调用doResolveAndConnect0
,否则将doResolveAndConnect0
方法的调用放在regFuture
的Listener
中,等regFuture
操作完成成,由EventLoop
线程来回调。
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
复制代码
那么又有读者疑问了,在这个if
判断完成之后到添加Listener
之间的这个时间,promise
有可能已经完成了,Listener
可能不会回调了, 奥秘在DefaultPromise
的addListener(GenericFutureListener<? extends Future<? super V>> listener)
方法里,这里注册完Listener
之后,如果发现promise
已经完成了,那么将直接调用nofityListeners
方法向EventLoop
提交异步任务(此时已经完成绑定EventLoop
),该异步任务即是回调刚刚注册的Listener
。
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
synchronized (this) {
addListener0(listener);
}
if (isDone()) {
notifyListeners();
}
return this;
}
复制代码
咱们回归正题,去看BootStrap
类里的doResolveAndConnect0
方法,这里首先用AddressResolver
去解析SocketAddress
,这里的AddressResolver
默认值为io.netty.resolver.DefaultAddressResolverGroup#INSTANCE
,同样这里是异步的,和initAndRegister
那里是一样的,咱们不多讨论。
我们把重点聚集在doConnect
方法上。
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
try {
final EventLoop eventLoop = channel.eventLoop();
final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
doConnect(remoteAddress, localAddress, promise);
return promise;
}
final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
if (resolveFuture.isDone()) {
final Throwable resolveFailureCause = resolveFuture.cause();
if (resolveFailureCause != null) {
promise.setFailure(resolveFailureCause);
} else {
doConnect(resolveFuture.getNow(), localAddress, promise);
}
return promise;
}
resolveFuture.addListener(new FutureListener<SocketAddress>() {
@Override
public void operationComplete(Future<SocketAddress> future) throws Exception {
if (future.cause() != null) {
channel.close();
promise.setFailure(future.cause());
} else {
doConnect(future.getNow(), localAddress, promise);
}
}
});
} catch (Throwable cause) {
promise.tryFailure(cause);
}
return promise;
}
复制代码
我们来看一下doConnect
方法,一般情况下,我们不指定客户端的localAddress
,所以这里localAddress
一般为null
,我们跟进去channel.connect(remoteAddress, connectPromise)
方法。
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
复制代码
再次放出这张netty整体架构图。
channel.connect(remoteAddress, connectPromise)
方法的实现在AbstractChannel
类中,这里调用了pipeline.connect(remoteAddress, promise)
,又调用到了tail.connect(remoteAddress, promise)
,这个调用最终会从tail
传递到head
(参考上边的netty整体架构图),具体怎么传递的,等咱们研究pipeLine
的时候再讲。接下来咱们直接到HeadContext
中去。
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
复制代码
HeadContext
的connect
方法如下,又委托给了unsafe
,调用unsafe.connect(remoteAddress, localAddress, promise)
方法,这个方法的实现在AbstractNioUnsafe
类中。
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
复制代码
我们来到AbstractNioUnsafe
的connect
方法,这里调用了doConnect(remoteAddress, localAddress)
方法。
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
try {
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
复制代码
doConnect
方法这里的实现在NioSocketChannel
中,前面我们说过主动连接时一般localAddress
是null
,所以这里我们不再讨论doBind0
方法了,感兴趣的同学可以回到“服务端启动流程”这篇文章去看doBind0
方法。
接下来调用了javaChannel().connect(remoteAddress)
,由于此前已经把该Channel
设置为非阻塞的了,这个connect
操作是异步的,是由操作系统来进行异步完成的。所以这里的connect
方法有可能返回true
也有可能返回false
。至此,我们已经看到了导读中提到的第2步连接到服务方端口。到这里,导读中提到的所有操作我们都已经在netty中找到了。
先看一下jdk里面关于connect
这个方法的注释
If this channel is in non-blocking mode then an invocation of this method initiates a non-blocking connection operation. If the connection is established immediately, as can happen with a local connection, then this method returns true. Otherwise this method returns false and the connection operation must later be completed by invoking the finishConnect method.
翻译一下
如果该channel在非阻塞模式下,调用该方法将初始化一个非阻塞的连接操作。如果连接立即完成了,例如可能发生在一个本地连接的情况下,该方法返回true。否则该方法将返回false,随后必须调用finishConnect方法完成连接。
也就是说如果connect
返回true
的话,表明连接已经完成了。如果返回false
的话还需要调用一下finishConnect
方法才能最终完成连接。
如果connect
方法返回false
还需要调用一下finishConnect
方法才能完成连接,这个调用在哪里呢?奥秘就在下一行,如果返回false
就在selectionKey
中加入SelectionKey.OP_CONNECT
兴趣事件,等到所绑定的EventLoop
发现这个Channel
上有SelectionKey.OP_CONNECT
事件发生时去调用finishConnect
方法,这个咱们一会儿分析。
如果返回true
,表明连接已经成功,咱们继续回到到AbstractNioUnsafe
的connect
方法
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
boolean connected = javaChannel().connect(remoteAddress);
//多数情况下connected为false
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
复制代码
如果doConnect
方法返回true
,则调用fulfillConnectPromise
方法,顾名思义应该是将promise
设置成完成状态,咱们跟进去看看,doConnect
返回false
的情况咱们一会再分析。
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
try {
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
复制代码
在fulfillConnectPromise
方法中,首先调用了promise.trySuccess()
方法将promise
设置为完成的,又调用了pipeline().fireChannelActive()
方法,这里最终会调用到咱们引导代码中的System.out.println("ChannelActive")
在控制台打印出ChannelActive
。
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
boolean active = isActive();
boolean promiseSet = promise.trySuccess();
if (!wasActive && active) {
pipeline().fireChannelActive();
}
if (!promiseSet) {
close(voidPromise());
}
}
复制代码
如果doConnect
方法返回false
呢,首先将AbstractNioChannel
类中的connectPromise
赋值为参数中所传来的promise
,为什么要赋这个值呢,咱们一会儿揭秘。
又继续添加了一个定时任务,这个定时任务将在连接超时时间到来时将connectPromise
设置为失败的。
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
try {
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
复制代码
好了,我们前面说过doConnect
方法极大概率返回false
,连接还未完成,那么真正完成连接在哪里呢。其实咱们前面已经说过了,如果connect
方法返回false
接下来有这么一行代码selectionKey().interestOps(SelectionKey.OP_CONNECT);
添加一个SelectionKey.OP_CONNECT
兴趣事件。
这部分内容咱们前边还没介绍,我就直接给出了,至于怎么执行到这里的,咱们以后讲。
直接到NioEventLoop
的processSelectedKey
方法里看,这个方法很长,我们只截取其中的一小段。我们看到这里判断了一下是否有SelectionKey.OP_CONNECT
事件发生,如果发生了SelectionKey.OP_CONNECT
事件,则将SelectionKey.OP_CONNECT
从兴趣事件中删除,随后调用unsafe.finishConnect()
方法。
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
复制代码
咱们来看一下unsafe.finishConnect()
方法,这个实现在AbstractNioUnsafe
内,调用doFinishFinishConnect()
就是本方法的关键了,fulfillConnectPromise(connectPromise, wasActive)
咱们前面已经说过了,不再分析。但是咱们要提一点,还记得前面咱们说过在AbstractNioUnsafe
的connect
方法中把参数中传来的promise
赋值给AbstractNioChannel
中的connectPromise
属性吗?这里就是原因了,如果不在属性中保存这个promise
,那么这里就无法将connectPromise
传递给fulfillConnectPromise(connectPromise, wasActive)
方法。
@Override
public final void finishConnect() {
assert eventLoop().inEventLoop();
try {
boolean wasActive = isActive();
doFinishConnect();
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
} finally {
}
}
复制代码
doFinishFinishConnect()
的实现在NioSocketChannel
类中,非常简单,调用jdk的NioSocketChannel
类的finishConnect
方法。还记得咱们前面留下的问题吗?finishiConnect()
方法在哪里调用的,答案已经有了,就在这里。
@Override
protected void doFinishConnect() throws Exception {
if (!javaChannel().finishConnect()) {
throw new Error();
}
}
复制代码
至此,客户端启动流程我们已经分析完毕。
3 总结
netty客户端启动流程:
-
创建一个
Channel
实例,这个过程中将Channel
设置为非阻塞的,为Channel
创建了PipeLine
和Unsafe
。 -
初始化
Channel
,为Channel
添加引导代码中所设置的handler
,并设置参数和属性。 -
注册
Channel
,为Channel
绑定一个EventLoop
并向Selector
注册Channel
。 -
调用
connect
方法连接到服务端,如果connect
返回false
将在Channel
发生OP_CONNECT
事件时调用finishiConnect
方法完成连接。
关于作者
王建新,转转架构部资深Java工程师,主要负责服务治理、RPC框架、分布式调用跟踪、监控系统等。爱技术、爱学习,欢迎联系交流。