netty的bind流程中的register方法
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
复制代码
register0最终会调用到java nio的方法,如下所示,下面就是java nio的Selector那一套东西:
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (blocking)
throw new IllegalBlockingModeException();
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
复制代码
在register方法中,有一个判断eventLoop.inEventLoop()
,这个判断在netty的源码中的很多地方都会见到,是用来判断当前执行的线程是不是eventLoop中的thread线程(就是常说的I/O线程)。
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
复制代码
这个判断可以引出netty线程的一些处理方式,在netty中:
- 一个 EventLoopGroup 当中会包含一个或多个 EventLoop 。
- 一个 EventLoop 在它的整个生命周期中都只会与唯一一个Thread进行绑定。
- 所有由 EventLoop 所处理的各种I/O事件都将在它所关联的那个Thread上进行处理。
- 一个Channel在它的整个生命周期中只会注册在一个 EventLoop 上。
- 一个EventLoop 在运行过程当中,会被分配一个或多个Channel。
重要结论1:在netty中,Channel的实现一定是线程安全的(只会与一个线程绑定,相当于单线程);基于此,我们可以存储一个Channel引用,并且在需要向远程端点发送数据时,通过这个引用来调用这个Channel的相应方法,而且消息一定会按顺序发送出去。
重要结论2:我们在业务开发过程当中,不要将长时间执行的耗时任务放到EventLoop的执行队列中,因为它将会阻塞该线程所对应的所有Channel上的其他执行任务。如果我们需要进行阻塞调用或者耗时的操作(实际开发过程中很常见),那么我们就需要使用一个专门的 EventExecutor(业务线程池)。
业务线程池通常有两种实现方式:
- 在ChannelHandler的回调方法中,使用自己定义的业务线程池
- 借助netty提供的向ChannelPipeline添加ChannelHandler时调用的addLast方法来传递EventExecutor。
说明:默认情况下(调用addLast(handler)),ChannelHandler中的回调方法都是有I/O线程所执行,如果调用 ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler… handlers)方法,那么ChannelHandler中的回调方法由参数中的group线程组来执行。
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END