Netty源码(一)- Reactor(线程模型)

前言:

为了更好地理解本系列文章,需要读者对JAVA NIO和Netty有简单的了解。本节只对Netty中的线程模型进行整体概况,不做源码的具体分析,读者在理解这个模型后,后面的源码分析阶段,就相对容易理解了。分析版本:master分支。

一、Netty常用线程模型

大家在使用Netty进行服务端开发的时候,会定义两个NioEventLoopGroup,”bossGroup”和”workGroup”。
在Netty中”bossGroup”负责处理客户端连接,”workGroup”负责处理读写操作。这是Netty给我们提供的线程模型,但是线程模型具体的工作方式大家有过了解吗?

上面提到的Netty线程模型如下:

image.png
图中”mainReactor”对应Netty中”bossGroup”,”subReactor”对应Netty中”workGroup”。

上图在Netty中的具体实现为:Netty会创建一个单例线程池(mainReactor)用于处理客户端的连接(对应于JAVA NIO代码的ServerSocketChannel#accept),创建多个单例线程池(subReactor)用于处理Channel的读写。并且netty使用了线程懒创建优化,就是说在需要使用线程时,才会去创建线程。

所以Netty中不管是mainReactor还是subReactor,他们的线程模型都是由多个类似SingleThreadExecutor的线程池组成。每个Channel只能被一个线程池处理,但是一个线程池可以处理多个Channel。

Netty对一个连接请求的处理大致流程为:请求先进入mainReactor,当一个连接请求在mainReactor中被处理成功后,mainReactor会将该连接放入后面的subReactor中去处理(使用轮询的方式选择一个具体的单例线程池)。

在Netty中线程模型的具体实现为MultithreadEventLoopGroup。所以接下来我们具体分析一下MultithreadEventLoopGroup的源码。

二、MultithreadEventLoopGroup源码

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, int maxPendingTasks,
                                            RejectedExecutionHandler rejectedHandler, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
         // 执行任务时,最终调用的执行器(线程)
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(new DefaultThreadFactory(getClass()));
        }
        
        children = new EventExecutor[nThreads];
        powerOfTwo = isPowerOfTwo(children.length);
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                // 创建类似SingleThreadExecutor的线程池
                children[i] = newChild(executor, maxPendingTasks, rejectedHandler, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event executor", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        final FutureListener<Object> terminationListener = future -> {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
        readonlyChildren = Collections.unmodifiableList(Arrays.asList(children));
    }
复制代码

上面的代码中有两个重要的方法:executor的赋值和newChild(executor, maxPendingTasks, rejectedHandler, args)

对executor的赋值代码非常简单,就是利用DefaultThreadFactory创建一个线程,然后执行线程。代码如下:

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        requireNonNull(threadFactory, "threadFactory");
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}
复制代码

newChild

newChild方法最终调用SingleThreadEventLoop的构造函数来创建线程池,主要是一些参数的赋值。只需注意,最后存放任务的队列,不是使用的LinkedBlockingQueue,而是使用的org.jctools.queues.MpscChunkedArrayQueue。具体差别,可参考

三、SingleThreadEventLoop#execute

public void execute(Runnable task) {
        requireNonNull(task, "task");

        //判断task任务的提交线程和该实例中所存储的线程是否是一致
        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            //重要逻辑
            startThread();
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                }
                if (reject) {
                    reject();
                }
            }
        }
        
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }
复制代码

startThread方法源码:

private void startThread() {
        //控制了该方法只会被执行一次,也就是只会启动一个线程
        if (state == ST_NOT_STARTED) {
            //CAS
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                    doStartThread();
                    success = true;
                } finally {
                    if (!success) {
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }
复制代码

doStartThread方法:

private void doStartThread() {
        assert thread == null;
        // 这里的executor默认ThreadPerTaskExecutor,可以通过MultithreadEventLoopGroup的参数进行设置
        // 真正的启动线程,这里的executor就算设置成线程池也没用,永远只会调用一个线程来执行
        executor.execute(() -> {
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                for (;;) {
                    int oldState = state;
                    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                            this, oldState, ST_SHUTTING_DOWN)) {
                        break;
                    }
                }

                // Check if confirmShutdown() was called at the end of the loop.
                if (success && gracefulShutdownStartTime == 0) {
                    if (logger.isErrorEnabled()) {
                        logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                "be called before run() implementation terminates.");
                    }
                }

                try {
                    // Run all remaining tasks and shutdown hooks. At this point the event loop
                    // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
                    // graceful shutdown with quietPeriod.
                    for (;;) {
                        if (confirmShutdown()) {
                            break;
                        }
                    }

                    // Now we want to make sure no more tasks can be added from this point. This is
                    // achieved by switching the state. Any new tasks beyond this point will be rejected.
                    for (;;) {
                        int oldState = state;
                        if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
                                this, oldState, ST_SHUTDOWN)) {
                            break;
                        }
                    }

                    // We have the final set of tasks in the queue now, no more can be added, run all remaining.
                    // No need to loop here, this is the final pass.
                    confirmShutdown();
                } finally {
                    try {
                        cleanup();
                    } finally {
                        // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
                        // the future. The user may block on the future and once it unblocks the JVM may terminate
                        // and start unloading classes.
                        // See https://github.com/netty/netty/issues/6596.
                        FastThreadLocal.removeAll();

                        STATE_UPDATER.set(this, ST_TERMINATED);
                        threadLock.countDown();
                        int numUserTasks = drainTasks();
                        if (numUserTasks > 0 && logger.isWarnEnabled()) {
                            logger.warn("An event executor terminated with " +
                                    "non-empty task queue (" + numUserTasks + ')');
                        }
                        terminationFuture.setSuccess(null);
                    }
                }
            }
        });
    }
复制代码

io.netty.channel.SingleThreadEventLoop#run:

@Override
    protected void run() {
        assert inEventLoop();
        do {
            // 连接、读写重要逻辑
            runIo();
            if (isShuttingDown()) {
                ioHandler.prepareToDestroy();
            }
            //执行任务队列里面的任务
            runAllTasks(maxTasksPerRun);
        } while (!confirmShutdown());
    }
复制代码

最后,可以看到我们关心的I/O连接、读写都在runIo方法中,最终会调用io.netty.channel.IoHandler#run,而runAllTasks则是简单执行task的run方法。对于IoHandler的分析我们将单独起一篇进行分析。
对于更多的Reactor线程模型,可参考Netty系列文章之Netty线程模型

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享