这是我参与更文挑战的第27天,活动详情查看:更文挑战
一、前言
之前,已经分析了
Netty
的服务端启动。
先来回顾一下,Netty
服务端启动流程,如图:
那么在
Netty
服务端完全启动之后,就可以对外服务了。
下面从这几方面进行分析:
- 服务端如何建立客户端的新连接?
- 服务端如何读取客户端的消息?
- 服务端处理完请求后,如何把响应消息发送给客户端?
二、直接怼源码
怼源码也按照流程来进行:
- 服务端如何建立客户端的新连接?
- 服务端如何读取客户端的消息?
- 服务端处理完请求后,如何把响应消息发送给客户端?
(1)服务端如何建立客户端的新连接?
流程,如图:
Netty
服务端处理客户端新建连接,主要分为四步:
Boss NioEventLoop
线程轮询客户端新连接OP_ACCEPT
事件- 构造
Netty
客户端NioSocketChannel
- 注册
Netty
客户端NioSocketChannel
到Worker
工作线程中 - 注册
OP_READ
事件到NioSocketChannel
的事件集合
Netty
中 Boss NioEventLoop
专门负责接收新的连接。
当客户端有新连接接入服务端时,Boss NioEventLoop
会监听到 OP_ACCEPT
事件,源码如下所示:
// 定位:io.netty.channel.nio
public final class NioEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
for (;;) {
// ... ...
processSelectedKeys();
}
}
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
// ... ...
processSelectedKey(k, (AbstractNioChannel) a);
// ... ...
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
try {
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
// 重要:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0
|| readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
}
复制代码
那么来看下 unsafe.read();
是如何处理的?
可以看出
read()
方法的核心逻辑就是:通过while
循环不断读取数据,然后放入List
中,这里的数据其实就是新连接。
// 定位:io.netty.channel.nio
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
// 内部类
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
// ... ...
try {
try {
do {
// while 循环不断读取 Buffer 中的数据
int localRead = doReadMessages(readBuf);
// ... ...
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
// 触发 channelRead 事件传播
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
// ... ...
} finally {
// ... ...
}
}
}
}
复制代码
需要重点跟进一下 NioServerSocketChannel
的 doReadMessages()
方法。
// 定位:io.netty.channel.socket.nio
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// 1. 先通过 JDK 底层的 accept() 获取 JDK 原生的 SocketChannel
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// 2. 将它封装成 Netty 自己的 NioSocketChannel
// 2.1. 创建核心成员变量 id、unsafe、pipeline
// 2.2. 注册 SelectionKey.OP_READ 事件
// 2.3. 设置 Channel 的为非阻塞模式;新建客户端 Channel 的配置
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
}
复制代码
当成功构造客户端 NioSocketChannel
后,接下来会通过 pipeline.fireChannelRead()
触发 channelRead
事件传播。
特殊的处理器
ServerBootstrapAcceptor
,在这里它就发挥了重要的作用。
channelRead
事件会传播到 ServerBootstrapAcceptor.channelRead()
方法,channelRead()
会将客户端 Channel
分配到工作线程组中去执行。
具体实现如下:
// 定位:io.netty.bootstrap
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
// 内部类
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
// 在客户端 Channel 中添加 childHandler
// childHandler 是用户在启动类中通过 childHandler() 方法指定的
child.pipeline().addLast(childHandler);
// ... ...
try {
// 注册客户端 Channel
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
}
}
复制代码
ServerBootstrapAcceptor
通过 childGroup.register()
方法会完成第三和第四两个步骤,将 NioSocketChannel
注册到 Worker
工作线程中,并注册 OP_READ
事件到 NioSocketChannel
的事件集合。
在注册过程中比较有意思的一点是,它会调用 pipeline.fireChannelRegistered()
方法传播 channelRegistered
事件,然后再调用 pipeline.fireChannelActive()
方法传播 channelActive
事件。
readIfIsAutoRead()
方法,此时它会将 SelectionKey.OP_READ
事件注册到 Channel
的事件集合。
(2)服务端如何读取客户端的消息?
流程如图:
发现这块代码很绕,很难定位,那怎么办呢?
这里我说下我的方法,即在自己写的
handler
中打上断点,之后client
端进行访问,这时候就可以在IDEA
中一步步断点(F8
)
提示:
- 接收连接是
NioMessageUnsafe
,读取数据是NioByteUnsafe
- 接收连接是
boss
主Reactor
线程,读取数据是work
从Reactor
线程
服务端读取客户端消息,主要分为两步:
- 读取客户端的请求数据
- 交付给
pipeline
代码如下:
// 定位:io.netty.channel.nio
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
// 内部类
protected class NioByteUnsafe extends AbstractNioUnsafe {
@Override
public final void read() {
// ... ...
try {
do {
// 1. 读取数据
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
// ... ...
// 2. 重要:通知 pipeline
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
// ... ...
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// ... ...
}
}
}
}
复制代码
读取细节,代码如下:
// 定位:
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
@Override
public ChannelHandlerContext read() {
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
next.invokeRead();
// ... ...
return this;
}
// 具体实现:
private void invokeRead() {
if (invokeHandler()) {
try {
// 这里会调用用户自定义的 handler
((ChannelOutboundHandler) handler()).read(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
read();
}
}
}
复制代码
(3)服务端处理完请求后,如何把响应消息发送给客户端?
流程如图:
这就相对容易些了,直接看 handler
中回写的代码:
// 这是写在
ChannelHandlerContext ctx;
ctx.write(responseBuffer);
复制代码
可以分为四步:
- 处理响应数据
ByteBuf
- 将响应数据放入响应缓存中
NioEventLoop
读取- 选择
SocketChannel
- 处理响应数据
ByteBuf
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
// ... ...
write(msg, false, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
// 重要:
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
// ... ...
//
next.invokeWriteAndFlush(m, promise);
// ... ...
}
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
}
复制代码
- 将响应数据放入响应缓存中
// 自定义的 handler
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
// 之后,会走到这
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
// 定位:io.netty.channel
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
@Override
public ChannelHandlerContext flush() {
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
next.invokeFlush();
// ... ...
}
}
// 之后慢慢进入,会发现:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
// 写入缓存
outboundBuffer.addFlush();
flush0();
}
protected void flush0() {
// 执行真正写
// 抽象方法,实际上会走 NioSocketChannel
doWrite(outboundBuffer);
}
}
复制代码
-
NioEventLoop
读取 -
选择
SocketChannel
// 定位:io.netty.channel.socket.nio
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
// ... ...
// 找到对应 客户端的channel
SocketChannel ch = javaChannel();
}
}
复制代码