【Netty】服务端处理客户端新请求

这是我参与更文挑战的第27天,活动详情查看:更文挑战

一、前言

之前,已经分析了 Netty 的服务端启动。

先来回顾一下,Netty 服务端启动流程,如图:

服务端启动.png

那么在 Netty 服务端完全启动之后,就可以对外服务了。

下面从这几方面进行分析:

  1. 服务端如何建立客户端的新连接?
  2. 服务端如何读取客户端的消息?
  3. 服务端处理完请求后,如何把响应消息发送给客户端?

二、直接怼源码

怼源码也按照流程来进行:

  1. 服务端如何建立客户端的新连接?
  2. 服务端如何读取客户端的消息?
  3. 服务端处理完请求后,如何把响应消息发送给客户端?

(1)服务端如何建立客户端的新连接?

流程,如图:

服务端建立连接.png

Netty 服务端处理客户端新建连接,主要分为四步:

  1. Boss NioEventLoop 线程轮询客户端新连接 OP_ACCEPT 事件
  2. 构造 Netty 客户端 NioSocketChannel
  3. 注册 Netty 客户端 NioSocketChannelWorker 工作线程中
  4. 注册 OP_READ 事件到 NioSocketChannel 的事件集合

NettyBoss NioEventLoop 专门负责接收新的连接。
当客户端有新连接接入服务端时,Boss NioEventLoop 会监听到 OP_ACCEPT 事件,源码如下所示:

// 定位:io.netty.channel.nio
public final class NioEventLoop extends SingleThreadEventLoop {

    @Override
    protected void run() {
        for (;;) {
            // ... ...
            processSelectedKeys();
        }
    }
    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }    
    private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            // ... ...
            processSelectedKey(k, (AbstractNioChannel) a);
            // ... ...
        }
    }    
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        try {
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                ch.unsafe().forceFlush();
            }
            // 重要:
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 
                || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
}
复制代码

那么来看下 unsafe.read(); 是如何处理的?

可以看出 read() 方法的核心逻辑就是:通过 while 循环不断读取数据,然后放入 List 中,这里的数据其实就是新连接。

// 定位:io.netty.channel.nio
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {

    // 内部类
    private final class NioMessageUnsafe extends AbstractNioUnsafe {
        private final List<Object> readBuf = new ArrayList<Object>();
        @Override
        public void read() {
            // ... ...
            try {
                try {
                    do {
                        // while 循环不断读取 Buffer 中的数据
                        int localRead = doReadMessages(readBuf);
                        // ... ...
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }
                // 触发 channelRead 事件传播
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                // ... ...
            } finally {
                // ... ...
            }
        }
    }
}
复制代码

需要重点跟进一下 NioServerSocketChanneldoReadMessages() 方法。

// 定位:io.netty.channel.socket.nio
public class NioServerSocketChannel extends AbstractNioMessageChannel
    implements io.netty.channel.socket.ServerSocketChannel {
    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        // 1. 先通过 JDK 底层的 accept() 获取 JDK 原生的 SocketChannel
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
                // 2. 将它封装成 Netty 自己的 NioSocketChannel
                // 2.1. 创建核心成员变量 id、unsafe、pipeline
                // 2.2. 注册 SelectionKey.OP_READ 事件
                // 2.3. 设置 Channel 的为非阻塞模式;新建客户端 Channel 的配置
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }
}
复制代码

当成功构造客户端 NioSocketChannel 后,接下来会通过 pipeline.fireChannelRead() 触发 channelRead 事件传播。

特殊的处理器 ServerBootstrapAcceptor,在这里它就发挥了重要的作用。

channelRead 事件会传播到 ServerBootstrapAcceptor.channelRead() 方法,channelRead() 会将客户端 Channel 分配到工作线程组中去执行。

具体实现如下:

// 定位:io.netty.bootstrap
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
    // 内部类
    private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;
            // 在客户端 Channel 中添加 childHandler
            // childHandler 是用户在启动类中通过 childHandler() 方法指定的
            child.pipeline().addLast(childHandler);

            // ... ...

            try {
                // 注册客户端 Channel
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
    }
}
复制代码

ServerBootstrapAcceptor 通过 childGroup.register() 方法会完成第三和第四两个步骤,将 NioSocketChannel 注册到 Worker 工作线程中,并注册 OP_READ 事件到 NioSocketChannel 的事件集合。

在注册过程中比较有意思的一点是,它会调用 pipeline.fireChannelRegistered() 方法传播 channelRegistered 事件,然后再调用 pipeline.fireChannelActive() 方法传播 channelActive 事件。

readIfIsAutoRead() 方法,此时它会将 SelectionKey.OP_READ 事件注册到 Channel 的事件集合。

(2)服务端如何读取客户端的消息?

流程如图:

服务端读取客户端消息.png

发现这块代码很绕,很难定位,那怎么办呢?

这里我说下我的方法,即在自己写的 handler 中打上断点,之后 client 端进行访问,这时候就可以在 IDEA 中一步步断点(F8

提示:

  1. 接收连接是 NioMessageUnsafe,读取数据是 NioByteUnsafe
  2. 接收连接是 bossReactor 线程,读取数据是 workReactor 线程

服务端读取客户端消息,主要分为两步:

  1. 读取客户端的请求数据
  2. 交付给 pipeline

代码如下:

// 定位:io.netty.channel.nio
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
    // 内部类
    protected class NioByteUnsafe extends AbstractNioUnsafe {
                @Override
        public final void read() {
            // ... ...
            try {
                do {
                    // 1. 读取数据
                    byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    // ... ...
                    
                    // 2. 重要:通知 pipeline
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());

                // ... ...
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
               // ... ...
            }
        }
    }
}
复制代码

读取细节,代码如下:

// 定位:
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
    implements ChannelHandlerContext, ResourceLeakHint {
    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }

    @Override
    public ChannelHandlerContext read() {
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        next.invokeRead();
        
        // ... ...
        return this;
    }

    // 具体实现:
    private void invokeRead() {
        if (invokeHandler()) {
            try {
                // 这里会调用用户自定义的 handler
                ((ChannelOutboundHandler) handler()).read(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            read();
        }
    }
}
复制代码

(3)服务端处理完请求后,如何把响应消息发送给客户端?

流程如图:

服务端返回响应消息.png

这就相对容易些了,直接看 handler 中回写的代码:

// 这是写在
ChannelHandlerContext ctx;
ctx.write(responseBuffer);
复制代码

可以分为四步:

  1. 处理响应数据 ByteBuf
  2. 将响应数据放入响应缓存中
  3. NioEventLoop 读取
  4. 选择 SocketChannel

  1. 处理响应数据 ByteBuf
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
    implements ChannelHandlerContext, ResourceLeakHint {
    
        @Override
    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        // ... ...
        write(msg, false, promise);

        return promise;
    }
    
    private void write(Object msg, boolean flush, ChannelPromise promise) {
        // 重要:
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        // ... ...
        // 
        next.invokeWriteAndFlush(m, promise);
        // ... ...
    }
    
    private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
            invokeFlush0();
        } else {
            writeAndFlush(msg, promise);
        }
    }
    
    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }
}
复制代码
  1. 将响应数据放入响应缓存中
// 自定义的 handler
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    // 之后,会走到这
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

// 定位:io.netty.channel
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {
    @Override
    public ChannelHandlerContext flush() {
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        next.invokeFlush();
        // ... ...
    }
}

// 之后慢慢进入,会发现:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    @Override
    public final void flush() {
        assertEventLoop();

        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) {
            return;
        }
        // 写入缓存
        outboundBuffer.addFlush();
        flush0();
    }
    
    protected void flush0() {
        // 执行真正写
        // 抽象方法,实际上会走 NioSocketChannel
        doWrite(outboundBuffer);
    }
}
复制代码
  1. NioEventLoop 读取

  2. 选择 SocketChannel

// 定位:io.netty.channel.socket.nio
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        // ... ...
        
        // 找到对应 客户端的channel
        SocketChannel ch = javaChannel();
    }
}
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享