从Java NIO到Netty(八)

Netty 源码解读

bind启动源码

我们的服务是通过下面代码进行绑定端口并启动的,所以我们从bind()方法开始分析

ChannelFuture f = b.bind(port).sync();
复制代码

一直跟进到io.netty.bootstrap.AbstractBootstrap#doBind方法

private ChannelFuture doBind(final SocketAddress localAddress) {
        //1 初始化和注册逻辑
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) 
            ChannelPromise promise = channel.newPromise();
            //2 端口绑定逻辑
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                         promise.setFailure(cause);
                    } else {
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
复制代码

1 初始化和注册逻辑

继续跟进initAndRegister()方法,根绝下面的注释,你类比一下Java NIO的代码,你会不会有莫名的亲切感?

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            //(1)创建ServerSocketChannel
            channel = channelFactory.newChannel();
            //初始化Channel,不展开说明了
            init(channel);
        } catch (Throwable t) {
            if (channel != null) 
                channel.unsafe().closeForcibly();
            }
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        //(2)注册selector
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }
复制代码

(1)创建ServerSocketChannel

回看一下抛弃服务的代码

.channel(NioServerSocketChannel.class)
复制代码

这里就是把这个NioServerSocketChannel实例化,见io.netty.channel.ReflectiveChannelFactory#newChannel

@Override
    public T newChannel() {
        try {
            //这里这个clazz指的就是NioServerSocketChannel.class,具体怎么传进来的,大家可以自己看代码,这里不介绍了。
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }
复制代码

通过反射进行实例化我们就直接看NioServerSocketChannel的构造函数即可看看都干了什么,见io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel()

    /**
     * Create a new instance
     */
    public NioServerSocketChannel() {
        这里主要干了两件事情:
        (1)newSocket(DEFAULT_SELECTOR_PROVIDER),其实这里就是创建了Java NIO中的ServerSocketChannel
        (2this方法继续调用构造函数
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    //(2)处会调用这个构造函数
    public NioServerSocketChannel(ServerSocketChannel channel) {
        //我们主要关心这个调用父类的构造函数干了什么,看见SelectionKey.OP_ACCEPT有没有很熟悉的样子
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
复制代码

见io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel

/**
     * Create a new instance
     *
     * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
     * @param ch                the underlying {@link SelectableChannel} on which it operates
     * @param readInterestOp    the ops to set to receive data from the {@link SelectableChannel}
     */
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        //我们仍然继续往里跟进
        super(parent);
        this.ch = ch;
        //这个地方把SelectionKey.OP_ACCEPT事件保存到成员变量中了哦
        this.readInterestOp = readInterestOp;
        try {
            //设置成非阻塞模式,有没有感觉又很熟悉了?
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }
复制代码

见io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)

    /**
     * Creates a new instance.
     *
     * @param parent
     *        the parent of this channel. {@code null} if there's no parent.
     */
    protected AbstractChannel(Channel parent) {
        //这里把Java NIO的ServerSocketChannel保存进了成员变量
        this.parent = parent;
        //给这个Channel一个id
        id = newId();
        //这个先忽略,以后介绍
        unsafe = newUnsafe();
        //看看这个地方,是不是生成了一个Pipeline和一个Channel关联了?
        pipeline = newChannelPipeline();
    }
复制代码

我们来心中回顾一下创建一个NioServerSocketChannel都干了哪些事情:

  • (1)创建了一个Java NIO底层的SeverSocketChannel,作为自己的成员变量;
  • (2)给自己弄了个id;
  • (3)创建了一个Pipeline作为自己的成员变量;
  • (4)设置成了非阻塞模式。

(2)注册selector

config().group()拿到的就是我们的bossGroup(具体怎么区分的,可以自己通过丢弃服务代码b.group(bossGroup, workerGroup)跟进去看一下),那整个config().group().register(channel);我们就可以理解为在EventLoop上注册我们的Channel。
接下来我们先忽略Netty如何选择bossGroup哪个EventLoop直接跳到在EventLoop上注册的方法,见io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)

@Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        //用NioServerSocketChannel的unsafe注册,注意这个地方传参是把当前的EventLoop和NioServerSocketChannel都传进去了
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
复制代码

继续跟进,见io.netty.channel.AbstractChannel.AbstractUnsafe#register

 @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            
            ...
            AbstractChannel.this.eventLoop = eventLoop;

            //现在咱们再mian主线程里,所以会走else
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try { 
                    //这个地方涉及到eventLoop的启用,我们下一章讲
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            //所以会在当前eventLoop进行注册
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    ...
                }
            }
        }
复制代码

往register0里继续跟,见io.netty.channel.AbstractChannel.AbstractUnsafe#register0

private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                //其他不关心继续往里跟这个方法
                doRegister();
                ...
        }
复制代码

跟进doRegister(),见io.netty.channel.nio.AbstractNioChannel#doRegister

@Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                //看到这段代码是不是又有莫名的亲切感,让我们回想一下《从Java NIO到Netty(二)》的内容
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
               ...
        }
    }
复制代码

在上述代码中,javaChannel()拿到的就是Java NIO的ServerSocketChannel,这个register方法也是Java NIO的那个register方法,把这个ServerSocketChannel注册到当前eventLoop的selector上面去。
那可能有人问了,ServerSocketChannel应该把OP_ACCEPT事件注册到selector上面去,这里怎么体现的呢?别着急,往下看。

2 端口绑定逻辑

当创建完NioServerSocketChannel后,会调用doBind0把创建好的NioServerSocketChannel传进来,见io.netty.bootstrap.AbstractBootstrap#doBind0

private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    //在NioServerSocketChannel的eventLoop上执行bind
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
复制代码

继续跟进,见io.netty.channel.AbstractChannel.AbstractUnsafe#bind

@Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            ...
            boolean wasActive = isActive();
            try {
                //(1)端口绑定
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }

            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        (2)在pipeline传播ChannelActive事件:主要关心如何把OP_ACCEPT事件注册到selector上面的
                        pipeline.fireChannelActive();
                    }
                });
            }

            safeSetSuccess(promise);
        }
复制代码

(1)端口绑定

继续往里跟,见io.netty.channel.socket.nio.NioServerSocketChannel#doBind

    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        //这个地方是不是又感觉很熟悉?就是调用Java NIO的bind进行端口绑定的,见《从Java NIO到Netty(一)》
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }
复制代码

(2)在pipeline传播ChannelActive事件

往里跟,见io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //这个地方就是把事件继续向下一个链表节点传播
        ctx.fireChannelActive();
        //主要看这个
        readIfIsAutoRead();
    }
    private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {
                //往这里面走
                channel.read();
            }
    }
复制代码

继续往里走直到走到io.netty.channel.nio.AbstractNioChannel#doBeginRead(如果不知道怎么走的建议debug模式自己往里跟)

    @Override
    protected void doBeginRead() throws Exception {
        //这个官方注释也表述很清楚了,这两个方法最终会调用到这个地方
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            //还记得我们之前创建NioServerSocketChannel时把SelectionKey.OP_ACCEPT事件赋值给了这个readInterestOp吗?看这个地方就把这个事件加到这个selectionKey中了
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
复制代码

综上,我们bind源码就算讲解完了,有没有心中有数都干了什么?可以对比《从Java NIO到Netty(四)》中的这块代码看,只不过多了selector在eventloop上而已。那么我们下一章就讲一下两个EventLoopGroup的源码。

        //打开ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //监听端口9999
        serverSocketChannel.socket().bind(new InetSocketAddress(9999));
        //设置为非阻塞模式
        serverSocketChannel.configureBlocking(false);
        //创建Selector
        Selector selector = Selector.open();
        //将选serverSocketChannel注册到selector,并在注册过程中指出该serverSocketChannel可以进行Accept操作
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享