前言:
为了更好地理解本系列文章,需要读者对JAVA NIO和Netty有简单的了解。本节只对Netty中的线程模型进行整体概况,不做源码的具体分析,读者在理解这个模型后,后面的源码分析阶段,就相对容易理解了。分析版本:master分支。
一、Netty常用线程模型
大家在使用Netty进行服务端开发的时候,会定义两个NioEventLoopGroup,”bossGroup”和”workGroup”。
在Netty中”bossGroup”负责处理客户端连接,”workGroup”负责处理读写操作。这是Netty给我们提供的线程模型,但是线程模型具体的工作方式大家有过了解吗?
上面提到的Netty线程模型如下:
图中”mainReactor”对应Netty中”bossGroup”,”subReactor”对应Netty中”workGroup”。
上图在Netty中的具体实现为:Netty会创建一个单例线程池(mainReactor)用于处理客户端的连接(对应于JAVA NIO代码的ServerSocketChannel#accept),创建多个单例线程池(subReactor)用于处理Channel的读写。并且netty使用了线程懒创建优化,就是说在需要使用线程时,才会去创建线程。
所以Netty中不管是mainReactor还是subReactor,他们的线程模型都是由多个类似SingleThreadExecutor的线程池组成。每个Channel只能被一个线程池处理,但是一个线程池可以处理多个Channel。
Netty对一个连接请求的处理大致流程为:请求先进入mainReactor,当一个连接请求在mainReactor中被处理成功后,mainReactor会将该连接放入后面的subReactor中去处理(使用轮询的方式选择一个具体的单例线程池)。
在Netty中线程模型的具体实现为MultithreadEventLoopGroup。所以接下来我们具体分析一下MultithreadEventLoopGroup的源码。
二、MultithreadEventLoopGroup源码
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
// 执行任务时,最终调用的执行器(线程)
if (executor == null) {
executor = new ThreadPerTaskExecutor(new DefaultThreadFactory(getClass()));
}
children = new EventExecutor[nThreads];
powerOfTwo = isPowerOfTwo(children.length);
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 创建类似SingleThreadExecutor的线程池
children[i] = newChild(executor, maxPendingTasks, rejectedHandler, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event executor", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
final FutureListener<Object> terminationListener = future -> {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
readonlyChildren = Collections.unmodifiableList(Arrays.asList(children));
}
复制代码
上面的代码中有两个重要的方法:executor的赋值和newChild(executor, maxPendingTasks, rejectedHandler, args)。
对executor的赋值代码非常简单,就是利用DefaultThreadFactory创建一个线程,然后执行线程。代码如下:
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
requireNonNull(threadFactory, "threadFactory");
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
复制代码
newChild
newChild方法最终调用SingleThreadEventLoop的构造函数来创建线程池,主要是一些参数的赋值。只需注意,最后存放任务的队列,不是使用的LinkedBlockingQueue,而是使用的org.jctools.queues.MpscChunkedArrayQueue。具体差别,可参考。
三、SingleThreadEventLoop#execute
public void execute(Runnable task) {
requireNonNull(task, "task");
//判断task任务的提交线程和该实例中所存储的线程是否是一致
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
//重要逻辑
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
复制代码
startThread方法源码:
private void startThread() {
//控制了该方法只会被执行一次,也就是只会启动一个线程
if (state == ST_NOT_STARTED) {
//CAS
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
复制代码
doStartThread方法:
private void doStartThread() {
assert thread == null;
// 这里的executor默认ThreadPerTaskExecutor,可以通过MultithreadEventLoopGroup的参数进行设置
// 真正的启动线程,这里的executor就算设置成线程池也没用,永远只会调用一个线程来执行
executor.execute(() -> {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates.");
}
}
try {
// Run all remaining tasks and shutdown hooks. At this point the event loop
// is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
// graceful shutdown with quietPeriod.
for (;;) {
if (confirmShutdown()) {
break;
}
}
// Now we want to make sure no more tasks can be added from this point. This is
// achieved by switching the state. Any new tasks beyond this point will be rejected.
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
this, oldState, ST_SHUTDOWN)) {
break;
}
}
// We have the final set of tasks in the queue now, no more can be added, run all remaining.
// No need to loop here, this is the final pass.
confirmShutdown();
} finally {
try {
cleanup();
} finally {
// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
// the future. The user may block on the future and once it unblocks the JVM may terminate
// and start unloading classes.
// See https://github.com/netty/netty/issues/6596.
FastThreadLocal.removeAll();
STATE_UPDATER.set(this, ST_TERMINATED);
threadLock.countDown();
int numUserTasks = drainTasks();
if (numUserTasks > 0 && logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + numUserTasks + ')');
}
terminationFuture.setSuccess(null);
}
}
}
});
}
复制代码
io.netty.channel.SingleThreadEventLoop#run:
@Override
protected void run() {
assert inEventLoop();
do {
// 连接、读写重要逻辑
runIo();
if (isShuttingDown()) {
ioHandler.prepareToDestroy();
}
//执行任务队列里面的任务
runAllTasks(maxTasksPerRun);
} while (!confirmShutdown());
}
复制代码
最后,可以看到我们关心的I/O连接、读写都在runIo方法中,最终会调用io.netty.channel.IoHandler#run,而runAllTasks则是简单执行task的run方法。对于IoHandler的分析我们将单独起一篇进行分析。
对于更多的Reactor线程模型,可参考Netty系列文章之Netty线程模型