解读服务端启动流程

图片[1]-解读服务端启动流程-一一网
更多,请关注:liangye-xo.xyz/

解读服务端启动流程

先来个 demo:

public final class EchoServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            // 服务端启动器
            ServerBootstrap b = new ServerBootstrap();
            // bossGroup - ServerChannel会去使用
            // workerGroup - 就有 ServerChannel产生的客户端 Channel会去使用
            b.group(bossGroup, workerGroup)
             // 设置服务端 Channel类型, 内部创建出来一个反射工厂 ReflectiveChannelFactory, 其提供了一个 newInstance方法用于创建 Channel实例
             .channel(NioServerSocketChannel.class)
             // 保存 Server端的自定义选项
             .option(ChannelOption.SO_BACKLOG, 100)
             // 配置用户自定义的 Server端 pipeline处理器, 后续创建出来 NioServerChannel实例后, 会去将用户自定义的 handler添加到其 pipeline中去
             .handler(new LoggingHandler(LogLevel.INFO))
             // 配置服务端上连接进来的客户端, 设置客户端 Channel内部的 Pipeline初始信息
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     // 获取服务端 pipeline
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // 以上步骤并没有创建出任何的 Channel, 只是将后续创建 Channel的配置信息保存到 ServerBootstrap中去而已


            // Start the server.
            // b.bind(port) 这里实际上返回来的是与 bind操作相关的 promise - 当 register0()对应操作结束时, 实现 channel注册与 bind的对接
            // promise.sync() - 阻塞当前主线程直到绑定相关操作执行完毕, 才会被唤醒
            ChannelFuture f = b.bind(PORT).sync();

            /** 在这一步进行一下调试, 会发现 f.isSuccess() == true **/

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
复制代码

doBind

    // 真正的去 bind方法
    private ChannelFuture doBind(final SocketAddress localAddress) {
        // regFuture其实就是 register()返回来的 promise - 关联异步任务 register0() - Channel相关的 EventLoop会去执行
        final ChannelFuture regFuture = initAndRegister();
        //以服务端启动来讲, 这里是 NioServerSocketChannel
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
        // 判断 Channel是否已经注册完毕
        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else { // 来分析这个分支
            // Registration future is almost always fulfilled already, but just in case it's not.
            // 这一步, 主线程来创建了一个 promise - 其实就是来与注册任务执行结果对接起来
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            // 这里又来实现异步操作了
            // 为 register0对应的 promise添加监听器(其实, 说是 "回调对象更为恰当一些"), 实现任务完后的回调 - EventLoop会去执行
            regFuture.addListener(new ChannelFutureListener() {
                // 可以看到, 此回调方法中主要来处理可注册任务失败与否对应的逻辑处理
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    // 获取
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause); // 表示注册失败
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered(); // 表示注册成功

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            // 这里来返回了与 bind操作相关的 promise
            return promise;
        }
    }
复制代码

bind之 initAndRegister

    // 顾名思义, 进行 Channel的初始化及其注册
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // channelFactory - ReflectiveChannelFactory
            // 以服务端启动来将, 这里就是通过反射来创建 NioServerSocketChannel实例(调用无参构造)
            // 以客户端连接来说, 这里通过反射创建的便是 NioSocketChannel实例
            // 对于服务端主要干的事:设置 Channel非阻塞、内部创建出来了 pipeline(head、tail)
            // 以及设置 Channel感兴趣事件:Accept,创建出 Unsafe实例,类型为 NioMessageUnsafe
            channel = channelFactory.newChannel(); // 这里来获取的是 Netty层面的 Channel - 实际上是 Jdk层面的 Channel的封装
            // 这一步主要来给当前服务端 Chanel的 pipeline添加了 ChannelInitializer
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // config包含着 ServerBootstrap
        // group()这里实际上获取的是 ServerBootstrap中的 bossGroup - NioEventLoopGroup
        // 由此可知, 这里最终来调用的是 NioEventLoopGroup.register(channel)
        ChannelFuture regFuture = config().group().register(channel);


        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    }
复制代码

可以看到,代码中有去调用了 channelFactory.newChannel(),这里的 channelFactory是 ReflectiveChannelFactory类型,在 channel方法中进行了指定

Channel创建

# ReflectiveChannelFactory.newChannel()

    @Override
    public T newChannel() {
        try {
            // 通过反射来去创建 clazz的实例
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }
复制代码

可以看到,这里通过反射进行了实例的构造,而目前是服务器启动流程,结合 channel方法,不难想出这里实际上调用的是 无参的 NioServerSocketChannel构造

    public NioServerSocketChannel() {
        // newSocket - 创建出 jdk层面的 ServerSocketChannel
        // Netty需要对 Jdk层面的 Channel进行封装成 Netty层面的 Channel
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }

    // 这里来创建 jdk层面的 ServerSocketChannel
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            /**
             *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
             *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
             *
             *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
             */
            // 这里来通过 jdk层面的 SelectorProvider创建出 jdk层面的 ServerSocketChannel
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }
复制代码

我们知道,通过 SelectorProvidor.openServerSocketChannel()来创建出来的实际上是 Jdk层面的 ServerSockerChannel,而这是 NioServerSocketChannel的构造,不难得出结论:Netty的 NioServerSocketChannel实际上就是对 Nio的 ServerSockerChannel的装饰增强

继续看 this的调用

    public NioServerSocketChannel(ServerSocketChannel channel) {
        // 参数一:parent - null
        // 参数二:jdk层面的 ServerSocketChannel
        // 参数三:感兴趣的事件 Accept - 服务端, 当前服务端 Channel最终是要被注册到 Selector上去, 因此需要此信息
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

	// AbstractNioMessageChannel是NioServerSocketChannel的父类
    protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        // 参数一:parent - null
        // 参数二:jdk层面的 ServerSocketChannel
        // 参数三:感兴趣的事件 Accept - 服务端, 当前服务端 Channel最终是要被注册到 Selector上去, 因此需要此信息
        super(parent, ch, readInterestOp);
    }

	/** AbstractNioChannel是AbstractNioMessageChannel的父类 **/
    /** 服务端类型 **/
    // 参数一:parent - null
    // 参数二:jdk层面的 ServerSocketChannel
    // 参数三:感兴趣的事件 Accept - 服务端, 当前服务端 Channel最终是要被注册到 Selector上去, 因此需要此信息
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        // 注意, 这里只是来进行将 Acctpt事件对应值赋值到变量上而已, 还没有进行真正的注册!
        this.readInterestOp = readInterestOp;
        try {
            // 配置当前 Channel为非阻塞状态 - 这也是使用多路复用选择器 selector的前提
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                logger.warn(
                            "Failed to close a partially initialized socket.", e2);
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }
复制代码

可以看到,上述来设置 channel状态为非阻塞,这不就是来实现 多路复用的前提吗?

即,NioServerSocketChannel构造中已设置自身为了非阻塞状态

调用 super,来到 AbstractChannel构造中:

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        // 为每个 Channel创建 ChannelId对象 - Netty层面
        id = newId();
        // 对于服务端而言, 这里是:NioMessageUnsafe实例
        unsafe = newUnsafe();
        // 创建出来当前 Channel内部的 pipeline
        // 创建出来的 pipeline内部有两个默认的处理器:HeadContext、TailContext
        // Head <---> Tail
        pipeline = newChannelPipeline();
    }
复制代码

可以看到,在创建 Channel的过程中,将 pipeline进行了初始化:newChannelPipeline

    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        // 可以看到, 创建出来的 pipeline不是空的, 里面默认有两个处理器:head、tail

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }
复制代码

可以看到,pipeline默认初始化时附带了两个 handler:Head、Tail

其实是:Head <—> Tail的关系

此时,已经完成了 Channel的创建,接下来进行 Channel的初始化:init()

Channel初始化

    @Override
    void init(Channel channel) {
        setChannelOptions(channel, newOptionsArray(), logger);
        setAttributes(channel, newAttributesArray());
        // 获取服务端的 pipeline
        ChannelPipeline p = channel.pipeline();
        // workerGroup
        final EventLoopGroup currentChildGroup = childGroup;

        /** 以下便是来获取为客户端配置的一些属性参数等 **/

        // ChannelInitializer
        final ChannelHandler currentChildHandler = childHandler;
        // 获取 childOption设置的参数
        final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
        // 获取 childAttrs设置的参数
        // Netty的 Channel都是实现了 AttributeMap接口的, 因此可以在启动类中配置一些自定义数据, 这样创建出来的 Channel实例, 就会包含这些数据了
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);

        // 有这步, 可以看出服务端 pipeline:head <-> ChannelInitializer <-> tail
        // 简单理解:ChannelInitializer就是时压缩包, 当 Channel被激活时, 该压缩包就会被解压, 解压后进行移除
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                // 获取服务端 pipeline
                final ChannelPipeline pipeline = ch.pipeline();
                // 获取在服务端配置的 handler - 这里指的便是 LoggingHandler
                ChannelHandler handler = config.handler();
                // 将 handler添加到服务端的 pipeline中去
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
复制代码

简单来看,init()主要干了一件事:为 ServerSocketChannel的 pipeline中添加了一个特殊的 handler:ChannelInitializer,可以把它理解为是一个特殊的 handler,即 压缩包

// ChannelInitialize本身不是 handler, 而是通过了 handlerAdapter去适配了 handler
// 其存在意义:延迟初始化 pipeline, 什么时候进行初始化 ? 当 pipeline上的 Channel被激活时, 真正添加 handler的逻辑才会去执行
// 简单理解:ChannelInitializer就是时压缩包, 当 Channel被激活时, 该压缩包就会被解压, 解压后进行移除
@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
复制代码

Channel注册

        // config包含着 ServerBootstrap
        // group()这里实际上获取的是 ServerBootstrap中的 bossGroup - NioEventLoopGroup
        // 由此可知, 这里最终来调用的是 NioEventLoopGroup.register(channel)
        ChannelFuture regFuture = config().group().register(channel);
复制代码

# SingleThreadEventLoop.register

    // 返回值 ChannelFuture, 可知 register是可以来实现异步操作的
    @Override
    public ChannelFuture register(Channel channel) {
        // 可以看到, 这里对 channel进行了些许封装 - 赋予 Channel可执行异步操作的能力(监听器 - 关联事件完成后, 自动去执行回调)
        return register(new DefaultChannelPromise(channel, this));
    }
复制代码
    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        // 对于服务端:
        // promise.channel() - NioServerSockerChanel
        // promise.channel().unsafe() - NioMessageUnsafe - 这块调试一下就能知道结果了
        // 参数一:NioEventLoop 参数二:以服务端启动分析, 这块包含的便是 NioServerSocketChannel的 ChannelPromise
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
复制代码

接着来到,AbstractChannel内部类 AbstractUnsafe.register()

        // 参数一:NioEventLoop 参数二:以服务端启动分析, 这块包含的便是 NioServerSocketChannel的 ChannelPromise
        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ObjectUtil.checkNotNull(eventLoop, "eventLoop");
            // 判断当前 Channel是否有注册过
            if (isRegistered()) {
                // 通过 promise设置失败结果, 并且会去回调监听者, 执行失败的逻辑
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }

            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            // 因为当前方法是在 Unsafe类中, 因此这里通过 AbstractChannel.this去获取外部对象, 这里我们来考虑是:NioServerSocketChannel的场景
            // 这里就是来绑定下关系 (Channel & NioEventLoop), 后续 Channel上的事件或者任务都会依赖当前 EventLoop
            AbstractChannel.this.eventLoop = eventLoop;

            // 这里来判读当前线程是不是与 EventLoop绑定的线程
            // 这块通常不会成立, 以服务端启动分析, 这里线程便是主线程, 非 EventLoop中所绑定线程
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else { // 这块设计十分巧妙, 如果当前线程不是与 EventLoop绑定的线程, 那么就将注册的任务转交给 EventLoop去执行, 添加到 EventLoop的任务队列中去
                // 这也就实现了单线程下线程安全, 且也实现了主线程执行的非阻塞
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise); // 以服务端来讲, 这里的 promise包含着 NioServerSocketChannel
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }
复制代码

由上述不难看出,Channel的注册任务并非是由主线程去完成的,而是由 Channel所绑定的 EventLoop去完成,即将 Channel注册作为一个任务提交到 EventLoop任务队列中去

不得不说,这块设计比较巧妙,一方面保证多线程下单线程执行的线程安全,另一方面实现了 Channel注册操作的异步化,即不阻塞 Main线程

注册操作结果如何得知?通过 NioServerSocketChannel.bind()返回了 ChannelFuture,这里实际上就是 register操作对应的 ChannelPromise,调用 isSuccess()就能够知道对应操作的执行结果了,这不就实现了异步操作吗?

至此,bind之 initAndRegister解读完毕!

解读 eventLoop中线程创建时机

经过前述分析,在 NioServerSocketChannel创建时,完成了对其内部 ChannelPipeline的创建,并且在 init()方法中对 ChannelPipeline进行了初始化,即其默认会有两个处理器:HeadContext、TailContext

但是,我们并没有看到 NioServerSocketChannel相关的 EventLoop中线程的创建,而 Channel与 EventLoop(BossGroup所管理的)的绑定是在 register()方法中的,那么 thread的创建究竟是在什么时候?

其实是在第一次往 EventLoop提交异步任务时,会去创建出线程

以服务端启动流程来将,第一次提交的任务:

# AbstractChannel.register()

...
                // 这里来判读当前线程是不是与 EventLoop绑定的线程
            // 这块通常不会成立, 以服务端启动分析, 这里线程便是主线程, 非 EventLoop中所绑定线程
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else { // 这块设计十分巧妙, 如果当前线程不是与 EventLoop绑定的线程, 那么就将注册的任务转交给 EventLoop去执行, 添加到 EventLoop的任务队列中去
                // 这也就实现了单线程下线程安全, 且也实现了主线程执行的非阻塞
                try {
                    // 须清楚一点, 到这里为止, eventLoop对应的线程尚未被创建, 线程的创建时机是怎样的 ?
                    // 当往 eventLoop中第一次提交任务时, 会去创建线程!
                    // 提交了异步任务 ①
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise); // 以服务端来讲, 这里的 promise包含着 NioServerSocketChannel
                        }
                    });
                } 
复制代码

显然,由此此时 EventLoop线程没有创建,此时执行的线程定是 Main线程

来看看 execute()方法中主要干了啥

SingleThreadEventExecutor.execute

创建 NioEventLoopGroup时,其内 child数组实例化时对应的元素便是 SingleThreadEventExecutor实例

    // 以服务端启动分析, 这里的 task便是服务端 channel的注册任务
    @Override
    public void execute(Runnable task) {
        ObjectUtil.checkNotNull(task, "task");
        execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
    }

    // 以服务端启动分析, 这里的 task便是服务端 channel的注册任务
    private void execute(Runnable task, boolean immediate) {
        boolean inEventLoop = inEventLoop();
        addTask(task); // 往任务队列中新增任务
        // 第一次这块会是 true, 即 Main线程来执行这块逻辑
        if (!inEventLoop) {
            startThread(); // 关键
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                }
                if (reject) {
                    reject();
                }
            }
        }

        if (!addTaskWakesUp && immediate) {
            wakeup(inEventLoop);
        }
    }
复制代码

startThread()方法是关键

    private void startThread() {
        // 由此来保证 eventLoop对应线程不会重复被创建 - 当然第一次这能走得通
        if (state == ST_NOT_STARTED) {
            // cas设置 state
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                    doStartThread(); // 第一次走到这时, 在此方法中会来完成 eventLoop对应线程的创建
                    success = true;
                } finally {
                    if (!success) {
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }
复制代码

可以看到,start == ST_NOT_STARTED也就保证了该方法只会执行一次

doStartThread(),我们主要来看关键代码:

    private void doStartThread() {
        assert thread == null;
        // executor这个是关键:在 Group构造时分析过了, 这里的 executor类型是 ThreadPerTaskExecutor,
        // 其内部包含着线程工厂(DefaultThreadFactory类型), 以此来进行线程的创建.
        executor.execute(new Runnable() {
            // 这里设计得挺好的, 通过往 executor中提交任务, 提交的任务会去创建出一个线程来执行
            @Override
            public void run() {
                // 将当前创建出来的线程赋予到 thread上, 依赖完成对 EventLoop中线程的绑定与创建
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }
                ...
复制代码

可以看到,这一步来调用了 executor.execute()

executor是什么?是 ThreadPerTaskExecutor实例,详情可以去看 NioEventLoopGroup创建时对其内部管理的 NioEventLoop的创建

executor其内部属性 ThreadFactory,会去真正进行 thread的创建

# ThreadPerTaskExecutor

    // 可以看到, 每提交一个任务会去创建一个线程并启动它
    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
复制代码

可以看到,这里来完成了对线程的创建,并且在执行任务时完成了对 EventLoop中 thread的绑定

至此,我们知道了 eventLoop中线程的创建时机,便是在第一次往 eventLoop中添加异步任务时会去创建,当然,对应线程只会被创建一次

再加点彩蛋,来看看线程创建时会去干啥吧!

# DefaultThreadFactory.newThread

    @Override
    public Thread newThread(Runnable r) {
        // 创建出线程, 并为线程指定名称 - 类似 nioEventLoopGroup-2-1
        Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
        try {
            if (t.isDaemon() != daemon) {
                t.setDaemon(daemon);
            }

            if (t.getPriority() != priority) {
                t.setPriority(priority);
            }
        } catch (Exception ignored) {
            // Doesn't matter even if failed to set.
        }
        return t;
    }
复制代码

这一步完成了对所创建线程的一个命名,如我们所常见的:nioEventLoopGroup-2-1

至于 prefix的属性值的赋予,其实也是在 Group的构造时被赋值了的

至此,我们已经清楚了 EventLoop中线程的创建时机

解读 ChannelInitializer

在 ServerBootstrap.init()中去添加了 ChannelInitializer

    @Override
    void init(Channel channel) {
        setChannelOptions(channel, newOptionsArray(), logger);
        setAttributes(channel, newAttributesArray());
        // 获取服务端的 pipeline
        ChannelPipeline p = channel.pipeline();
        // workerGroup
        final EventLoopGroup currentChildGroup = childGroup;

        /** 以下便是来获取为客户端配置的一些属性参数等 **/

        // ChannelInitializer - 这个是为客户端所配置的
        final ChannelHandler currentChildHandler = childHandler;
        // 获取 childOption设置的参数
        final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
        // 获取 childAttrs设置的参数
        // Netty的 Channel都是实现了 AttributeMap接口的, 因此可以在启动类中配置一些自定义数据, 这样创建出来的 Channel实例, 就会包含这些数据了
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);

        // 由这步, 可以看出服务端 pipeline:head <-> ChannelInitializer <-> tail
        // 简单理解:ChannelInitializer就是压缩包, 当 Channel被激活时, 该压缩包就会被解压, 解压后进行移除
        p.addLast(new ChannelInitializer<Channel>() {
            // 以服务端启动分析可知, 这里参数传递过来的实际上是 NioServerSocketChannel
            @Override
            public void initChannel(final Channel ch) {
                // 获取服务端 pipeline
                final ChannelPipeline pipeline = ch.pipeline();
                // 获取在服务端配置的 handler - 这里指的便是 LoggingHandler
                ChannelHandler handler = config.handler();
                // 将 handler添加到服务端的 pipeline中去
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                // 可以看到, 这里来提交了一个异步任务 ②
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        // 这里来往 NioServerSocketChannel的 pipeline中新增了一个处理器 ServerBootstrapAcceptor
                        // 这个处理器十分特殊, 实际上是来执行客户端连接的一个处理器
                        // 此时 pipeline:head <-> LoggingHandler <-> ServerBootstrapAcceptor <-> tail
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
复制代码

怎么去理解 ChannelInitializer呢?

简单理解,ChannelInitializer是一个 压缩包,在一个必要的实际会去进行 解压,然后将自己进行移除

由上可以看出,再其解压时,根据前面的 demo分析可知,其往 pipeline中添加了两个处理器:LoggingHandler、ServerBootstrapAcceptor

那么这个必要的时机是什么?什么时候会去进行 ChannelInitializer的解压操作呢?

需清楚一点,此时 pipeline中:head <-> ChannelInitializer <-> tail

在正式分析解压时机之前,先来看看 addLast()中主要来干了什么

    // 对于 ChannelInitializer的 addLast是由 Main线程去执行的
    @Override
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        ObjectUtil.checkNotNull(handlers, "handlers");

        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }

        return this;
    }

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);
            // 这一步将 ChannelInitializer封装成了 ChannelHandlerContext
            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) { // 对于 ChannelInitializer, 此时 channel尚未注册
                newCtx.setAddPending(); // 设置 handler状态为已经入队状态
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

    // 不难看出, 所谓的 addLast其实是去将 handler(可以看到, pipeline中存储着的实际上是封装后的 handler)添加到 tail的前面
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
复制代码

这里实际上还涉及了一个关键方法:callHandlerCakkbackLater

    private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
        assert !registered;
        // 这一步将 ChannelInitializer封装后的 ChannelHandlerContext再进行了一次封装 - PendingHandlerAddedTask
        PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
        PendingHandlerCallback pending = pendingHandlerCallbackHead;
        // 这里来执行的便是单向链表的入队操作
        if (pending == null) {
            pendingHandlerCallbackHead = task; // 可以看到, 这一步 pendingHandlerCallbackHead中存储的最里层所封装着的实际上就是 ChannelInitializer
        } else {
            // Find the tail of the linked-list.
            while (pending.next != null) {
                pending = pending.next;
            }
            pending.next = task;
        }
    }
复制代码

由上可以看出,ChannelInitializer被封装成了 ChannelHandlerContext,然后添加到了 pipeline中,即所谓的 addLast()其实就是去将 handler添加到 tailContext的前面

并且,这一步将 ChannelInitializer封装后的 ChannelHandlerContext再进行了一次封装 – PendingHandlerAddedTask

有这其实也可以看出,pipeline中并不是直接去加入了 handler本身,加入的是对 handler封装后的 ChannelHandlerContext,其本身代表着便是 Handler与 Pipeline连接的一个关系

现在开始分析解压时机

其实解压时机便是,在 Channel对应的 EventLoop线程被创建之后,从任务队列中执行的第一个任务中会触发 ChannelInitializer的解压

那么第一个任务是什么时候被提交的呢?

在 AbstractChannel.register()中

        // 参数一:NioEventLoop 参数二:以服务端启动分析, 这块包含的便是 NioServerSocketChannel的 ChannelPromise
        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ObjectUtil.checkNotNull(eventLoop, "eventLoop");
            // 判断当前 Channel是否有注册过
            if (isRegistered()) {
                // 通过 promise设置失败结果, 并且会去回调监听者, 执行失败的逻辑
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }

            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            // 因为当前方法是在 Unsafe类中, 因此这里通过 AbstractChannel.this去获取外部对象, 这里我们来考虑是:NioServerSocketChannel的场景
            // 这里就是来绑定下关系 (Channel & NioEventLoop), 后续 Channel上的事件或者任务都会依赖当前 EventLoop
            AbstractChannel.this.eventLoop = eventLoop;

            // 这里来判读当前线程是不是与 EventLoop绑定的线程
            // 这块通常不会成立, 以服务端启动分析, 这里线程便是主线程, 非 EventLoop中所绑定线程
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else { // 这块设计十分巧妙, 如果当前线程不是与 EventLoop绑定的线程, 那么就将注册的任务转交给 EventLoop去执行, 添加到 EventLoop的任务队列中去
                // 这也就实现了单线程下线程安全, 且也实现了主线程执行的非阻塞
                try {
                    // 须清楚一点, 到这里为止, eventLoop对应的线程尚未被创建, 线程的创建时机是怎样的 ?
                    // 当往 eventLoop中第一次提交任务时, 会去创建线程!
                    // 提交了异步任务 ①
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise); // 以服务端来讲, 这里的 promise包含着 NioServerSocketChannel
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }
复制代码

register0()方法实际上执行者是 EventLoop对应的线程

        // 须清楚一点;该方法的执行是由 EventLoop来进行的, 与 Main线程无任何关系
        // 参数 promise表示注册结果的, 外部可以向它注册监听器, 以来完成注册后的逻辑
        // 这块也好理解, 只有注册成功后才能去执行 bind操作嘛. -即退出该方法后会去执行一个回调, 以此来执行 bind0()
        // 现在是来执行第一个任务, 接来下是 init()里添加的另一个异步任务,最后一个才会是 bind0()里提交的任务 ③
        private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                // 表示当前  Channel是不是第一次进行注册
                boolean firstRegistration = neverRegistered;

                doRegister(); // 关键, 在此方法中完成了 channel的注册逻辑, 即将 channel注册到 selector上
                // 这里就是来设置下注册相关的状态位
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                // invokeHandlerAddIfNeeded来完成对 ChannelInitializer的解压缩, 并且将自己移除了出去
                pipeline.invokeHandlerAddedIfNeeded();// invokeHandlerAddIfNeeded中实际上会去添加异步任务 ②

                // 这一步会去回调注册相关的 promise上注册的那些 listener, 如 "主线程"在 regFuture上注册的监听者
                safeSetSuccess(promise); // 这里回调到 doBind0()方法时又会去添加了一个异步任务 ③

                // 这里传递了一个入站事件, 从 pipeline头开始传递, 此时关注的 handler可以来做一些事情
                pipeline.fireChannelRegistered();


                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                // 以服务端启动流程来进行分析:
                // 这一步实际上是来判断当前 NioServerSocketChannel是否有完成绑定
                // false - 当前是在 register0中, 只来完成了底层 ServerSocketChannel的注册而已
                // 这块也好理解, 只有注册成功后才能去执行 bind操作嘛. -即退出该方法后会去执行一个回调, 以此来执行 bind0()
                // bind()实际上也是由 eventLoop线程去干的是事
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

复制代码

关键方法,invokeHandlerAddedIfNeeded()

invokeHandlerAddIfNeeded来完成对 ChannelInitializer的解压缩, 并且将自己移除了出去

    final void invokeHandlerAddedIfNeeded() {
        assert channel.eventLoop().inEventLoop();
        if (firstRegistration) {
            firstRegistration = false;
            // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
            // that were added before the registration was done.
            callHandlerAddedForAllHandlers();
        }
    }

    // 需清楚, 此方法的执行是由 EventLoop线程来做的
    private void callHandlerAddedForAllHandlers() {
        final PendingHandlerCallback pendingHandlerCallbackHead;
        synchronized (this) {
            assert !registered;

            // This Channel itself was registered.
            registered = true;
            // 这里就是来获取存储 task的单向链表 - 以服务端启动分析, 此时获取的便是对 ChannelInitializer最外层包装后的 task
            pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
            // Null out so it can be GC'ed.
            this.pendingHandlerCallbackHead = null;
        }

        // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
        // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
        // the EventLoop.
        PendingHandlerCallback task = pendingHandlerCallbackHead;
        // 遍历单向链表, 执行结果
        while (task != null) {
            task.execute();
            task = task.next;
        }
    }
复制代码

task.execute()执行的是什么,经过前面分析,我们知道实际上是:PenddingHandlerAddedTask.execute(),其最里层所封装的就是 ChannelInitializer

        @Override
        void execute() {
            // 这一步, 来获取到的便是 EventLoop
            EventExecutor executor = ctx.executor();
            if (executor.inEventLoop()) {
                callHandlerAdded0(ctx); // 会来执行此处
            } else {
                try {
                    executor.execute(this);
                } catch (RejectedExecutionException e) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(
                                "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
                                executor, ctx.name(), e);
                    }
                    atomicRemoveFromHandlerList(ctx);
                    ctx.setRemoved();
                }
            }
        }
复制代码
    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            ctx.callHandlerAdded(); // 执行这
        } catch (Throwable t) {
            boolean removed = false;
            try {
                atomicRemoveFromHandlerList(ctx);
                ctx.callHandlerRemoved();
                removed = true;
            } catch (Throwable t2) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to remove a handler: " + ctx.name(), t2);
                }
            }

            if (removed) {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; removed.", t));
            } else {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; also failed to remove.", t));
            }
        }
    }

    final void callHandlerAdded() throws Exception {
        // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
        // any pipeline events ctx.handler() will miss them because the state will not allow it.
        if (setAddComplete()) {
            // 以服务端启动视角来看, handler()来获取的便是 ChannelInitializer
            handler().handlerAdded(this);
        }
    }
复制代码

可以看到,接着来执行的便是:ChannelInitializer.handlerAdded()方法

# ChannelInitializer.handlerAdded()

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            // This should always be true with our current DefaultChannelPipeline implementation.
            // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
            // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
            // will be added in the expected order.
            // 可以看到, 此处来执行了 initChannel
            if (initChannel(ctx)) {

                // We are done with init the Channel, removing the initializer now.
                removeState(ctx);
            }
        }
    }
复制代码

关键来了,执行了 initChannel

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.add(ctx)) { // Guard against re-entrance.
            try {
                // initChannel这一步真正实现了对接!
                initChannel((C) ctx.channel()); // 这一步, 以服务端启动来分析, 来执行的便是我们在 ServerBootStrap添加的 ChannelInitializer里对应的回调方法 initChannel
            } catch (Throwable cause) {
                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                // We do so to prevent multiple calls to initChannel(...).
                exceptionCaught(ctx, cause);
            } finally {

                // 在 try方法中已经完成了对 ChannelInitializer的解压, 现在 结构:head <-> ChannelInitializer <-> LoggingHandler <-> Tail

                // 获取 pipeline
                ChannelPipeline pipeline = ctx.pipeline();
                // 显然当前 pipeline中存在 ChannelInitializer的上下对象的 ChannelHandlerContext
                if (pipeline.context(this) != null) {
                    pipeline.remove(this); // 可以看到, 这一步将 ChannelInitializer自身从 pipeline中移除出去了
                }
            }
            return true;
        }
        return false;
    }
复制代码

由上,便真正实现了真正的对接!initChannel()会去执行 ServerBootstrap.init()中所添加的 ChannelInitializer对应的 initChannel(),即这一步便实现了对 ChannelInitializer解压

并且解压后,执行了 pipeline.remove(this),this此时便是 ChannelInitializer,这又将 ChannelInitializer从 NioServerSocketChannel对应的 pipeline中移除了出去

至此,ChannelInitializer的解读完毕!

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享