Netty 源码解读
EventLoop源码
1 实例化NioEventLoopGroup
我们丢弃服务的代码直接使用的无参的构造函数进行实例化的,那么我们就从这个构造函数开始看吧,见io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup()
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
//看看这个地方创建了一个SelectorProvider那你是不是想到了Java NIO中的Selector了呢
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
复制代码
往super里跟,看看还干了什么,见:io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object…)
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
//看这个地方,如果用的无参构造函数这个地方传进来的都是0,但最终这个线程数为DEFAULT_EVENT_LOOP_THREADS
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
复制代码
可以看到如果不传线程数默认的线程数是DEFAULT_EVENT_LOOP_THREADS,那这个又是多少呢?点进去看定义可以看到是计算机核数的2倍:Runtime.getRuntime().availableProcessors() * 2
。
继续往super里面跟进,见io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object…)
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
//实例化executor
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//EventExecutor数组
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//实例化数组中的成员,我们跟进去看看
children[i] = newChild(executor, args);
success = true;
}
...
}
//给这个evetLoop创建一个选择器,选择器你可以理解为有多个eventLoop时,Channel选择运行在哪个eventLoop的线程上,感兴趣的话后续我们也写个专题讲讲。
chooser = chooserFactory.newChooser(children);
}
复制代码
我们继续跟进newChild
方法,见,io.netty.channel.nio.NioEventLoopGroup#newChild
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
//这个地方就实例化了一个eventLoop,我们继续跟进构造函数看看
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
复制代码
跟进构造函数,见,io.netty.channel.nio.NioEventLoop#NioEventLoop
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
//这个super我们就不继续跟进了,你看传参executor就可以想到把executor保存成成员变量为后续给这个eventLoop启动一个线程做铺垫了
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
//我们主要看这,可以看到Java NIO的selector在这里被创建并且成为了eventLoop的成员变量了
selector = openSelector();
selectStrategy = strategy;
}
复制代码
综上,我们回顾一下实例化NioEventLoopGroup都干了什么:创建了一个大小为计算机核数2倍的eventLoop数组并实例化了每个eventLoop,最核心的就是创建了eventLoop的成员变量:executor和selector
2 eventLoop的执行逻辑
还记得在上述章节我们讲了会在创建完ServerSocketChannel的之后会注册到其中一个eventLoop上,见io.netty.channel.AbstractChannel.AbstractUnsafe#register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
//在这个地方eventLoop开始执行,我们跟进去看
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
复制代码
我们跟进这个execute
方法,见io.netty.util.concurrent.SingleThreadEventExecutor#execute
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
//看这个地方,因为当前我们再main线程里,所以执行这个方法启用一个新的线程
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
//上面调的是这个方法
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
//继续跟进
doStartThread();
}
}
}
//调的这里
private void doStartThread() {
assert thread == null;
//最后是通过这个ThreadPerTaskExecutor来启动的新的线程
executor.execute(new Runnable() {
executor.execute(new Runnable() {
@Override
public void run() {
//首先把当前的这个线程保存在当前这个eventLoop的成员变脸thread中,看这样就完成了一个eventLoop和一个thread的绑定
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
//接下来我们主要看这个地方到底干了什么
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
...
}
}
});
}
复制代码
我们继续跟进这个SingleThreadEventExecutor.this.run();
见io.netty.channel.nio.NioEventLoop#run
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
//(1)这个select有没有很眼熟呢?回想一下Java NIO
select(wakenUp.getAndSet(false));
...
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
//(2)这里是不是也眼熟?
processSelectedKeys();
} finally {
//(3)我们也看看这里干了什么
runAllTasks();
}
}
...
}
}
复制代码
(1)select
我们跟进这个方法见io.netty.channel.nio.NioEventLoop#select
private void select(boolean oldWakenUp) throws IOException {
//拿到Java NIO的selector
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
//这段是不是很熟悉,就是通过for循环不断轮询是否有时间发生
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
int selectedKeys = selector.select(timeoutMillis);
...
}
}
}
复制代码
上边的代码我们先只关心Netty其实就是调用了Java NIO的selector开始轮询是否有时间发生。之所以它写的比较复杂是为了避免一个比较有名的一个空轮询的bug,那在这里我们先不讨论,以后可以成立独立的专题来讲述一下。
(2)processSelectedKeys()
一旦有事件发生我们就可以通过processSelectedKeys()
处理事件了,跟进见:io.netty.channel.nio.NioEventLoop#processSelectedKeys
private void processSelectedKeys() {
//有些人可能会好奇为啥这个selectedKeys不是在selector的成员变量被Neety单拿出来了,其实这个地方Netty针对这个selector的selectedKeys做了一些优化,我们也不细说,也可以单独成立个专题说,我们还是先串流程。
if (selectedKeys != null) {
//执行这里
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
//这个地方和Java NIO依次处理发生的事件也同样一样
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
selectedKeys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
//因为我们创建了NioServerSocketChannel所以我们主要看这里,要处理事件了
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
...
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
//得到当前AbstractNioChannel中的unsafe成员变量
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
...
try {
//下面的代码是不是又很熟悉了?就是Java NIO处理各种不同事件的代码了
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//这个地方我们还记的在Java NIO中干了什么吗?创建SocketChannel并注册读事件对不对?我们会在下一章讨论这里,即Netty是如何处理新连接接入的
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
复制代码
综上,eventLoop的执行已经讲解完,我们回顾上一章并联合Java NIO想想都做了什么?首先创建了NioServerSocketChannel,然后开启selector轮询事件,最后在selector上注册了OP_ACCEPT事件和Java NIO我们写的流程差不多呢?这样记是不是好记多了呢?