导读
原创文章,转载请注明出处。
本文源码地址:netty-source-code-analysis
本文所使用的netty版本4.1.6.Final:带注释的netty源码
EventLoop
在netty中发挥着驱动引擎的作用,本文我们以NioEventLoop
为例分析一下EventLoop
的工作原理。
1 EventLoop线程的创建时机
还记得我们在“服务端启动”和“客户端启动”这两篇文章中都有一个重要操作吗?就是将Channel
注册到EventLoop
上。我们看AbstractChannel
的register
方法,其中有eventLoop.execute
调用,是不是很熟悉。大多数情况下(并非绝对),这里就是EventLoop
线程开始的地方。
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
}
}
}
复制代码
咱们跟进去execute
方法,这个方法的实现在SingleThreadEventExecutor
中,我们看到这里有一个startThread
方法,看名字就知道,这里是线程真正开始的地方,一起来看看吧。
后面的!addTaskWakesUp && wakesUpForTask(task)
是怎么回事呢?
EventLoop
的实现中,有的EventLoop
实现会阻塞在任务队列上,对于这样的EventLoop
唤醒方法是向任务队列中添加一个比较特殊的任务,这样的EventLoop
中addTaskWakesUp
为ture
。
而有的EventLoop
比如NioEventLoop
不会阻塞在任务队列上,但是会阻塞在selector
上,对于这样的EventLoop
通过调用wakeup
方法唤醒,这样的EventLoop
中addTaskWakesUp
为false
。
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
复制代码
我们看看SingleThreadEventExecutor
中的wakeup
方法,这里通过向队列中添加一个特殊的task
来唤醒EventLoop
线程。
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop || STATE_UPDATER.get(this) == ST_SHUTTING_DOWN) {
taskQueue.offer(WAKEUP_TASK);
}
}
复制代码
而NioEventLoop
中覆盖了这个wakeup
方法,通过调用selector.wakeup
方法来唤醒EventLoop
线程,因为wakeup
是个重量级操作,所以netty用了一个AtomicBoolean
类型的wakenUp
变量来减少wakeup
的次数,如果已经被wakeup
了,就不再调用selector.wakeup
。
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
复制代码
startThread
方法中首先对EventLoop
的状态做了判断,如果为ST_NOT_STARTED
(未开始)状态,才调用doStartThread
方法,接着跟下去看doStartThread
方法。
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
复制代码
doStartThread
接着调用了SingleThreadEventExecutor.this.run()
方法,这个run
方法是抽象的,在这里没有实现。我们重点关注NioEventLoop
,所以我们去看NioEventLoop
中run
方法的实现。
我们前面已经讲过了这个executor
是ThreadPerTaskExecutor
,所以这里调用execute
方法会创建出一个新的线程,这个线程就是EventLoop
线程。
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
try {
SingleThreadEventExecutor.this.run();
} catch (Throwable t) {
} finally {
}
}
});
}
复制代码
2 EventLoop线程的工作内容
接下来我们要分析的逻辑中有很多关于wakenup
的magic操作,我也看不懂,非常难以理解,很多操作我觉得是没有必要的。这些地方在后来的版本中经过一次比较大的重构,逻辑更加清晰了,感兴趣的同学可以看一下这次github上的代码提交,Clean Up NioEventLoop。下面贴出的代码中已经删除了很多难以理解的,又不影响我们理解整个EvenLoop
主体逻辑的代码。
run
方法中是一个死循环,这就是EventLoop
线程的主要逻辑内容了。
每次循环之前要先调用一下selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())
方法判断下一步的动作,默认实现在DefaultSelectStrategy
中。这里如果hasTasks
为true
也就是说taskQueue
中有任务要执行的话,会调用一下selectSupplier.get()
,这个selectSupplier
是NioEventLoop
中的selectNowSupplier
属性,get
逻辑非常简单,就是调用一下非阻塞的selectNow
方法。
selectStrategy.calculateStrategy
这里的整体逻辑就是,如果当前有任务要执行,就立即调用selectNow
返回一个>=0的值,这将导致run
方法直接跳出switch
去执行下面的逻辑。否则就返回SelectStrategy#SELECT
,这样run
方法将进入select(wakenUp.getAndSet(false))
执行。
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
复制代码
咱们接着看run
方法,run
方法中的三个重要操作:
select(wakenUp.getAndSet(false))
processSelectedKeys()
runAllTasks()
EventLoop
的一生都在为这3件事奔波,咱们一起来看一下。
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
//select操作
select(wakenUp.getAndSet(false));
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
//处理Channel事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
//处理队列中的任务
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
}
}
}
复制代码
2.1 select
select
中也有一个死循环操作,在循环之前首先计算出一个selectDeadLineNanos
,这是select
操作的最迟返回时间,是当前时间+下一个定时任务距离现在的时间。
deleayNanos
方法会到定时任务队列(EventLoop
的创建这篇文章中的scheduledTaskQueue
)查看队首的任务距离现在还有多久,如果没有定时任务的话,默认返回1秒。
timeoutMillis
即select
操作的超时时间,至于这里为什么加上500微秒,我也觉得很奇怪,没有理解,咱们暂且不去管它,接着往下看。
如果发现timeoutMillis
<=0说明现在有定时任务要执行了,立即调用非阻塞的selector.selectNow
方法,并且跳出循环。
咱们接着往下看是阻塞式的selector.select
操作,如果阻塞期间有任务加入,会调用wakeup
y方法,这个select
操作会立即返回。
接下来的代码咱们只关注 rebuildSelector
,这是为了修复jdk的空轮询bug而设计的,默认如果发生了512次空轮询就重建selector
。
select
循环跳出的条件大致有以下几种情况:
- 有定时任务到期了
selector.select(timeoutMillis)
操作返回非0值- 往
EventLoop
中添加任务时唤醒了阻塞的selector.select(timeoutMillis)
操作 - 出现了512次空轮询
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
//select操作的最迟返回时间
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
//修复jdk空轮询bug
rebuildSelector();
selector = this.selector;
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
} catch (CancelledKeyException e) {
}
}
复制代码
2.2 processSelectedKeys
在跳出select
循环之后又回到了run
方法,咱们接着看run
方法剩余的逻辑,首先把callcedlledKeys
设置为0,并且把needsToSelectAgain
设置为false
,这两个是控制对已经取消的SelectionKey
进行清理的变量,每次调用selector.select
或者selector.selectNow
方法都会导致selector
驱逐出已经被取消的Selectionkey
,代码执行到这里,因为刚刚执行过select
或者selectNow
方法,所以此时肯定不存在被取消的Selectionkey
(SelectionKey
的取消操作必须被EventLoop
线程执行)。
接着判断ioRatio
变量的值,ioRatio
是表示EventLoop
处理io事件的时间比例,默认值为50。这个只能大致控制处理io事件的时间和处理异步任务的时间比例,并非绝对值。
这里如果ioRatio等于100,就先处理所有的io事件,再处理所有的任务。
如果ioRatio不等于100,就先处理所有的io事件,再处理异步任务,此时处理异步任务会有一个超时时间,是根据处理io事件所消耗的时间和ioRatio计算出来的。
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
//处理Channel事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
//处理队列中的任务
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
复制代码
我们先去看processSelectedKeys
方法,这里首先判断selectedKeys
是否为null
,如果不为null
说明selectedKeys
被netty优化过,咱们直接去看第1个分支,也就是优化过的分支。
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
复制代码
processSelectedKeysOptimized
方法遍历SelectionKey
数组,对每个SelectionKey
调用processSelectedKey(k, (AbstractNioChannel) a)
方法进行处理。
如果发现被取消的key过多,默认超过256,则清空数组中的剩余元素,重新select
,重新select
时selector
会驱逐已经取消的key。
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
//key == null说明已经遍历完了
if (k == null) {
break;
}
//遍历过的元素设置为null
selectedKeys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
//处理SelectionKey
processSelectedKey(k, (AbstractNioChannel) a);
} else {
}
//如果发现被取消的Key过多,默认超过256,则清空数组中的剩余元素,重新select
if (needsToSelectAgain) {
//清空数组剩余的元素
for (;;) {
i++;
if (selectedKeys[i] == null) {
break;
}
selectedKeys[i] = null;
}
//重新select,则selector会驱逐出已经取消的key
selectAgain();
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}
复制代码
接下来咱们关注一下processSelectedKey
方法,这里首先判断一下Key
是否已经取消了,如果已经取消了,则调用unsafe.close
关闭Channel
。接着往下首先判断是否发生了OP_CONNECT
事件,还记得NioSocketChannel.doConnect
方法吗,如果SocketChannel
的connect
方法返回false
,还要继续调用SocketChannel
的finishConnect
方法`才能真正完全连接,这里咱们在在“客户端的启动流程”这篇文章中已经讲过了,这里不再展开。
接下来是判断是否发生了OP_WRITE
事件,OP_WRITE
事件表明TCP
缓冲区有空间可以写数据,此时调用unsafe.forceFlush
方法将AbstractUnsafe#outboundBuffer
中的数据写入到TCP
缓冲区中。
最后判断如果发生了OP_READ
或者OP_ACCEPT
事件,表明Channel
可读或者有新连接接入,此时调用unsafe.read
方法读取数据。那么有人有疑问了,OP_ACCEPT
事件为什么也能调用unsafe.read
呢,此时并没有数据可以读取啊,这里就是一个特殊之处了,专为AbstractNioMessageChannel
而设计,而NioServerSocketChannel
也是AbstractNioMessageChannel
的子类,感兴趣的同学去看一下NioMessageUnsafe#read
方法和NioServerSocketChannel#doReadMessages
方法。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
//key已经取消了
if (!k.isValid()) {
unsafe.close(unsafe.voidPromise());
return;
}
try {
//在调用SocketChannel的connect方法返回false时,这里需要处理OP_CONNECT事件,在unsafe.finishConnect()方法中调用SocketChannel的finishConnect方法
//参考:io.netty.channel.socket.nio.NioSocketChannel.doConnect
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
//这个表示当前tcp缓冲区可写
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
//将未写入缓冲区的数据flush到缓冲区
ch.unsafe().forceFlush();
}
//OP_ACCEPT专为`AbstractNioMessageChannel`而设计
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
复制代码
至此run
方法中,processSelectedKeys
咱们已经分析完了,接下来看另外两个调用,runAllTasks()
和runAllTasks(long timeoutNanos)
,这两个方法主体逻辑差不多,只不过其中一个带有超时时间控制。
2.3 runAllTasks
咱们先看runAllTasks()
方法,这里首先调用fetchFromScheduledTaskQueue()
从scheduledTaskQueue
中将到期的定时任务拉到taskQueue
队列中,再调用runAllTasksFrom(taskQueue)
将taskQueue
队列中的所有任务执行完毕,最后调用afterRunningAllTasks()
方法执行完所有tailTasks
队列中的任务。这里用到了咱们在“EventLoop的构造”这篇中提到的3个重要的队列。
咱们还没说过tailTasks
这个队列有什么用,其实我也还没发现它有什么用,有人说它可以用来统计run
方法每次循环的时间。好吧,反正它不是重点,不必纠结。
fetchFromScheduledTaskQueue()
、runAllTasksFrom(taskQueue)
和afterRunningAllTasks()
这3个方法比较简单,咱们不再展开。
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
//从`scheduledTaskQueue`中将到期的定时任务拉到`taskQueue`队列中
fetchedAll = fetchFromScheduledTaskQueue();
//将taskQueue中的所有任务执行完毕
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
//将tailTasks中的所有任务执行完毕
afterRunningAllTasks();
return ranAtLeastOne;
}
复制代码
接着看runAllTasks(long timeoutNanos)
方法,这个方法咱们不多展开讲了,与runAllTasks()
的区别就在于多了一个超时时间,Netty这里对超时的判断做了一些优化,因为System.nanoTime
是一个很重的操作,所以这里并不是每执行一个任务就判断一下是否超时,而是每执行64个任务判断一下是否超时。
protected boolean runAllTasks(long timeoutNanos) {
//从scheduledTaskQueue将所有到期的任务拉到taskQueue中
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
//ScheduledFutureTask.nanoTime()是一个从ScheduledFutureTask的类加载开始的一个相对时间
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
//遍历taskQueue执行任务,因为natoTime操作很重,所以每64次任务判断一次是否超过了超时时间
for (; ; ) {
safeExecute(task);
runTasks++;
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
//执行所有tailTasks中的任务
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
复制代码
3 总结
EventLoop
的一生就是一个死循环,这个死循环中每次循环它干了3件事。
select
:选出有兴趣事件发生的Channel
processSelectedKeys
:处理Channel
上发生的io事件runAllTasks
:执行异步任务,包括scheduledTaskQueue
、taskQueue
和tailTasks
这3个队列中的任务。
关于作者
王建新,转转架构部资深Java工程师,主要负责服务治理、RPC框架、分布式调用跟踪、监控系统等。爱技术、爱学习,欢迎联系交流。