引言
说起Netty,可能很多开发者并没有使用经验,但是我敢说,在你使用的框架中,一定有他的身影,比如(RocketMQ,redisson,dubbo,motan,Elasticsearch......
想了解更多使用Netty的项目? –> 戳这里 ), 在本文中,我们将以通俗易懂的方式,讲清楚每一个组件的职责和作用。
- 因为Netty是基于Java中NIO的封装,其本质还是使用到了Java中的NIO,所以我们先来了解下NIO的几个组件
Channel
,Selector
和ByteBuffer
1、Java NIO 三剑客 Channel
,Selector
和 ByteBuffer
1.1、 Java NIO 的 Channel
- 可以简单理解为一条连接(短连接 or 长连接),或者有人叫它通道,也没问题。
1.2、Selector选择器
- 每一个连接在创建后将会注册(
必须是实现了SelectableChannel的连接才可以注册到selector中
)到某一个选择器(selector
)上,然后轮询选择器上IO已经就绪的channel
,将某批IO就绪的channel
对应的selectKey
(在注册时候就将某个channel
与某个SelectedKey
绑定的) 添加进selectedKeys
这个set
集合中去,后续轮询则是对该selectedKeys
轮询,取出对应的selectedKey
上的channel
,然后读取channel
的数据到byteBuffer
中,接着我们可以从byteBuffer
缓冲区读取数据,并进行业务处理。
1.3、ByteBuffer缓冲区
- 本质上是一个内存块,既可以写入数据,也可以从中读取数据,其提供了三个重要的成员属性:
capacity
(容量)、position
(读写位置)、limit
(读写的限制)对于byteBuffer
,我们点到为止,知道他是干什么的就行了,对其怎么使用以及api,我们暂时不做过多展开,因为这个类的使用还是有点复杂的,一两句话也很难解释清。
1.4、(补充说明) selector可以监听的事件类型
需要说明的是,在channel注册到selector时候,会传入一个参数即ops,代表这个选择器对哪些事件感兴趣 ,直白些就是:
指定选择器要监控的IO事件类型
包括:
(1)可读:SelectionKey.OP_READ
(2)可写:SelectionKey.OP_WRITE
(3)连接:SelectionKey.OP_CONNECT
(4)接收:SelectionKey.OP_ACCEPT
什么是IO事件呢?这个概念容易混淆,这里特别说明一下。这里的IO事件不是对通道的IO操作,而是通道的某个IO操作的一种就绪状态,表示通道具备完成某个IO操作的条件。比方说,某个SocketChannel通道,完成了和对端的握手连接,则处于“连接就绪”(OP_CONNECT)状态。再比方说,某个ServerSocketChannel服务器通道,监听到一个新连接的到来,则处于“接收就绪”(OP_ACCEPT)状态。还比方说,一个有数据可读的SocketChannel通道,处于“读就绪”(OP_READ)状态;一个等待写入数据的,处于“写就绪”(OP_WRITE)状态
在讲解各个组件之前,我们需要先了解Netty是怎么工作的,以及每个环节都干了什么。这样我们从整体上有了自己的认识后,再去了解其每一个节点上的组件,我想这样会更好一些。
2、Netty线程模型(让我们知道Netty是如何工作的)
对于在上篇文章中 【Netty系列_1】Netty简介与I/O&线程模型 介绍的三种Reactor
线程模型,Netty
可以说是都支持。下面我们主要看基于Reactor
主从多线程模型 下的Netty
线程模型(在实际开发中,也是这种模型更多一些)
2.2、基于Reactor主从多线程模型下的Netty工作示意图
2.3、父子通道说明和对上图的解释
2.3.1、父子通道说明
-
在解释前,有必要说一下常用的两种Channel,即 ServerSocketChannel和SocketChannel
ServerSocketChannel对应的socket描述符是连接监听类型。连接监听类型的socket描述符,一般放在服务器端,它负责接收客户端的套接字连接,在服务器端,一个“连接监听类型”的socket描述符可以接受(Accept)成千上万的传输类的socket描述符,而SocketChannel对应的socket描述符是传输数据类型。传输类的socket描述符负责传输数据。同一条TCP的Socket传输链路(即连接),在服务器和客户端,都分别会有一个与之相对应的数据传输类型的socket描述符 这也是为什么ServerSocketChannel只在启动时创建一次,而SocketChannel则是新来个连接,就需要新建一个SocketChannel(在做Netty调优时候,我们有一招就是扩大系统(例如Linux)的文件描述符数量,其实这里的文件描述符,就是传输类型的文件描述符)
在Netty中,将有接收关系的ServerSocketChannel和SocketChannel,叫作父子通道。其中,NioServerSocketChannel负责服务器连接监听和接收,也叫父通道(Parent Channel)。而NioSocketChannel传输类通道,叫做子通道(Child Channel)
2.3.2、对上图的解释(重要)
boss group
用于监听客户端连接,如果轮询到父通道(ServerSocketChannel
)中有接收就绪
即accept
类的IO 事件,那么将会添加该父通道所绑定的selectKey到selectedKeys
中,并通过while
循环对selectedKeys
进行轮询,只要selectedKeys
有元素,那么就调用processSelectedKeys
方法,之后将其添加到taskQueue
队列,然后执行runAllTasks
并创建子通道(SocketChannel
)并且将创建好的SocketChannel
传递并注册到worker group
的某个NioEventLoop
的selector
中,用于后续的read / write
等操作worker group
用于处理每一个连接发生的读写IO事件,worker group
中的NioEventLoop
会通过while循环轮询绑定到自身的SocketChannel
(实际上就是轮询selectedKeys),获取已经注册并且有I/O事件就绪的
(例如:read/write/心跳……)SocketChannel
,然后在NioEventLoop
的 I/O事件处理(processSelectedKeys
) 阶段分发给(通过runAllTasks)对应的ChannelPipeline
进行处理,而ChannelPipeLine
则是在我们程序中自己定义的通过ServerBootstrap的childHandler方法,将某SocketChannel与ChannelPipeLine建立绑定关系,在ChannelPipeLine
中,我们可以设置各种各样的ChannelHandler
,例如(HttpServerCodec,LoggingHandler,HttpObjectAggregator,IdleStateHandler,WebSocketServerCompressionHandler,ChunkedWriteHandler
) 或者我们自定义的处理业务逻辑的ChannelHandler
,到此时,我们开始在自定义的ChannelHandler中处理业务逻辑了。
3、Channel(注意是Netty包下的,不是Java NIO包下的)
- Netty中的
channel
的作用,本质和JavaNIO中的channel
的作用是一样的。
我们可以简单认为他就是一条网络连接
,或者说一个通道
。而连接(通道)是TCP通信必不可少的,由 它负责同对端进行网络通信
,可以写入数 据到对端
,也可以从对端读取数据
,由此可见Channel是网 络通信重点中的重点
3.1、 Channel的构造方法
- parent 父通道 SocketChannel的父通道为ServerSocketChannel,而ServerSocketChannel的父通道是null 因为它本身就是父通道,
- 每个通道都有唯一的ID
- unsafe 完成底层的实际I/O操作
- 每个通道都有对应的ChannelPipeline用于处理该通道对应的I/O事件
protected AbstractChannel(Channel parent) {
//父通道 SocketChannel的父通道为ServerSocketChannel(连接), ServerSocketChannel的父通道为null
this.parent = parent;
//每个通道都有唯一的ID
this.id = this.newId();
//完成底层的IO操作
this.unsafe = this.newUnsafe();
//每个通道都有对应的ChannelPipeline用于处理该通道对应的IO事件
this.pipeline = this.newChannelPipeline();
}
复制代码
3.2、 Channel的常用方法
ChannelFuture connect(SocketAddress address)
:连接远程服务器。方法的参数为远程服务器的地址,调用后会立即返回,返回值为负责连接操作的异步任务ChannelFuture。此方法在客户端的传输通道使用。ChannelFuture bind(SocketAddress address)
:绑定监听地址,开始监听新的客户端连接。此方法在服务器的新连接监听和接收通道使用。ChannelFuture close()
:关闭通道连接,返回连接关闭的ChannelFuture异步任务。如果需要在连接正式关闭后执行其他操作,则需要为异步任务设置回调方法;或者调用ChannelFuture异步任务的sync() 方法来阻塞当前线程,一直等到通道关闭的异步任务执行完毕。Channel read()
此方法的作用为:读取通道数据,并且启动入站处理。具体来说,从内部的Java NIO Channel通道读取数据,然后启动内部的Pipeline流水线,开启数据读取的入站处理。此方法返回通道自身, 用于链式调用。ChannelFuture write(Object o)
此方法的作用为:启程出站流水处理,把处理后的最终数据写到底层Java NIO通道。此方法的返回值为出站处理的异步处理任务。Channel flush()
此方法的作用为:将缓冲区中的数据立即写出到对端。并不是每一次write操作都是将数据直接写出到对端,write操作的作用在大部分情况下仅仅是写入到操作系统的缓冲区,操作系统会将根据缓冲区的情况,决定什么时候把数据写到对端。而执行flush()方法立即将缓冲区的数据写到对端。
上述6种方法,都是比较常见的方法
。在Channel接口中以及各种通道的实现类中,还定义了大量的通道操作方法,具体还有哪些感兴趣可以看看Netty的源码。
4、ChannelPipeline
4.1、 ChannelPipeline 概念解释
-
每条通道内部都有一条流水线(Pipeline)将
ChannelHandler
装配起来。Netty的业务处理器流水线ChannelPipeline
是基于责任链
设计模式来设计的,内部是一个双向链表结构,能够支持动态地添加和删除ChannelHandler
处理器。另外值的注意的是处理器还分为入站处理器和出站处理器,入站处理器实现ChannelInboundHandler
,出站处理器实现ChannelOutboundHandler
,入站处理器复制处理流入(如read
)的数据,出站处理器负责流出(如write
)的数据
4.2 给某个SocketChannel 设置 ChannelHandler链,即(ChannelPipeline)
- 我们可以往
SocketChannel
的channelPipeLine
中添加各种各样的channelHandler
包括编解码
,拆包粘包
处理以及我们处理业务的channelHandler
都是在此处添加的 如下图:
- 在设置好各种各样的
channelHandler
后,假如从某个socketChannel
中读到数,那么将会根据设置的channelHandler
的顺序,先后执行(注意,这里的先后执行是有条件的,即先执行入站,再执行出站类型的channelHandler
- 举个例子,如果你往某个
socketChannel
对应的channelPipeline
中先后添channelHandlerA , channelHandlerB, channelHandlerC,channelHandlerD 而channelC是出站类型的处理器, A B D是入站类的处理器,那么数据的流经顺序是A->B>D
然后如果有出站事件 比如 write, 那么才会流经 channelHandlerC
- 举个例子,如果你往某个
4.3、截断ChannelPipeline
- 值的注意的是 入站处理器是可以被截断的 (出站如果截断将会报错哦!),这里我们以channelRead0方法为例,看看如何截断入站处理流程
4.3.1、可以看到截断入站处理器有两种方式如下:
- 不调用
super.channelRead(ctx,msg);
- 不调用
ctx.fireChannelRead(msg);
4.4、动态删除ChannelPipeline中的某个ChannelHandler (很实用的一个特性)
-
事实上,我们可以把
channelHandler
看成是一段相对聚合的业务逻辑channelPipeline
是把这些个较内聚的handler编排成一个流程链,比如我们可以把权限认证做成一个channelHandler
,把数据的读取,处理做成一个channelHandler
,假如我们第一次登录走了权限认证的channelHandler
,那么下次该连接的数据再入站时,其实是没必要再次做权限认证的,我们只有判断其登录成功,就放行其进行后续的业务处理就好了,那么有没有什么方法,动态热插拔channelHandler呢
(也就是说在登录后,如何移除权限的channelHandler呢
)?
见下图:
-
另外如果你阅读过源码,那么可以得知
ChannelInitializer
本质上也是一个ChannelInboundHandler
,在ChannelInitializer
类的initChannel
方法中,就调用了remove
把他自己给移除了,很明显,在第一次给某个连接设置好一些channelHandler
后,下次就不需要再设置啦,所以他把自己给移除了。private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (this.initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { try { this.initChannel(ctx.channel()); } catch (Throwable var6) { this.exceptionCaught(ctx, var6); } finally { //移除自己 this.remove(ctx); } return true; } else { return false; } } 复制代码
5、ChannelHandler
- ChannelHandler主要任务为:
数据包解码 decode
、(read)后的业务处理
、目标数据编码 encode
、把数据包写到通道中 即(write)
- 上边我们也说了,我们可以把
channelHandler
看成是一段相对聚合的业务逻辑。比如编码
我们可以搞一个继承ChannelOutBoundHandler
的handler,解码
我们可以搞一个继承ChannelInboundHandler
的handler
5.1、两种ChannelHandler
- 入站处理 触发的方向为:自底向上,即从网卡读取到数据后,再到Netty的内部(如通道)然后进入ChannelInboundHandler入站处理器。
- 出站处理 触发的方向为:自顶向下,从ChannelOutboundHandler出站处理器到Netty的内部(如通道)。按照这种方向来分,前面数据包解码、业务处理两个环节——属于入站处理器的工作;后面目标数据编码、把数据包写到通道中两个环节——属于出站处理器的工作
5.1.1、ChannelInboundHandler 入站处理器常用方法
public interface ChannelInboundHandler extends ChannelHandler {
//
void channelRegistered(ChannelHandlerContext var1) throws Exception;
void channelUnregistered(ChannelHandlerContext var1) throws Exception;
void channelActive(ChannelHandlerContext var1) throws Exception;
void channelInactive(ChannelHandlerContext var1) throws Exception;
void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;
void channelReadComplete(ChannelHandlerContext var1) throws Exception;
void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;
void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
}
复制代码
-
channelRegistered
当通道注册完成后,Netty会调用fireChannelRegistered,触发通道注册事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelRegistered方法,会被调用到 -
channelActive
当通道激活完成后,Netty会调用fireChannelActive,触发通道激活事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelActive方法,会被调用到 -
channelRead
当通道缓冲区可读,Netty会调用fireChannelRead,触发通道可读事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelRead方法,会被调用到。 -
channelReadComplete
当通道缓冲区读完,Netty会调用fireChannelReadComplete,触发通道读完事件。通道会启动该入站操作的流水线处理,在通道注册过的入站处理器Handler的channelReadComplete方法,会被调用到。 -
channelInactive
当连接被断开或者不可用,Netty会调用fireChannelInactive,触发连接不可用事件。通道会启动对应的流水线处理,在通道注册过的入站处理器Handler的channelInactive方法,会被调用到。 -
exceptionCaught
当通道处理过程发生异常时,Netty会调用fireExceptionCaught,触发异常捕获事件。通道会启动异常捕获的流水线处理,在通道注册过的处理器Handler的exceptionCaught方法,会被调用到。注意,这个方法是在通道处理器中ChannelHandler定义的方法,入站处理器、出站处理器接口都继承到
5.1.2、ChannelOutBoundHandler 出站处理器常用方法
public interface ChannelOutboundHandler extends ChannelHandler {
void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;
void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;
void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
void read(ChannelHandlerContext var1) throws Exception;
void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;
void flush(ChannelHandlerContext var1) throws Exception;
}
复制代码
bind
监听地址(IP+端口)绑定:完成底层Java IO通道的IP地址绑定。如果使用TCP传输协议,这个方法用于服务器端connect
连接服务端:完成底层Java IO通道的服务器端的连接操作。如果使用TCP传输协议,这个方法用于客户端。write
写数据到底层:完成Netty通道向底层Java IO通道的数据写入操作。此方法仅仅是是写到用户进程的缓存区中,并不是完成实际的数据写入操作。flush
处理缓冲区中的数据,把这些数据写到对端:将底层缓存区的数据腾空,立即写出到对端。read
从底层读数据:完成Netty通道从Java IO通道的数据读取。disConnect
断开服务器连接:断开底层Java IO通道的服务器端连接。如果使用TCP传输协议,此方法主要用于客户端
结语
到此,本文就结束了,事实上最初我是准备把NioEventLoop
也加入此文,但是该类的解析,需要结合源码,由于本文和源码关联不是很紧密,所以准备在后续的源码分析中再讲解NioEventLoop
! ps:这个类是重点
! 理解了他,我相信你对Netty的设计会有更深的认识!
~
此文仅仅是对Netty的工作流程和部分组件进行解析和介绍,还缺少ByteBuf
,unsafe
,future
,promise
,编解码
,拆粘包
等等。想一文说完讲透 ,是相当难的。所以就留在后续的文章中慢慢分析学习吧
!