通过源码去了解Netty-io.netty.channel

io.netty.channel是什么

Channel作为网络socket或IO组件的连接体。io操作需要通过channel发起,所以继承了outbound.

extends AttributeMap, ChannelOutboundInvoker, java.lang.Comparable<Channel>
复制代码

与网络套接字或能够进行I / O操作(如读取,写入,连接和绑定)的组件的关系。
一个channel为用户提供:

  • channel的当前状态(例如,它是否打开?是否连接?),
  • 通道的configuration parameters (例如接收缓冲区大小),
  • 通道支持的I / O操作(例如读取,写入,连接和绑定)和
  • 处理所有I / O事件和与该通道相关的请求的ChannelPipeline 。

提供一个用户试图

  • channel的状态,如open/connected
  • channel的配置
  • channel支持的IO操作
  • channelpipeline,处理与该channel相关的所有IO事件/请求

所有的I/O操作都是异步的

由于采用事件驱动的机制,所以Netty中的所有IO操作都是异步的。这意味着当我们调用一个IO操作时,方法会立即返回并不保证操作已经完成。由上一章Future的讲解中,我们知道,这些IO操作会返回一个ChannelFuture对象,我们需要通过添加监听者的方式执行操作完成后需执行的代码。

Channel是有等级的 parent()

如果一个Channel由另一个Channel创建,那么他们之间形成父子关系。比如说,当ServerSocketChannel通过accept()方法接受一个SocketChannel时,那么SocketChannel的父亲是ServerSocketChannel,调用SocketChannel的parent()方法返回该ServerSocketChannel对象。

可以使用向下转型获取子类的特定操作

某些子类Channel会提供一些所需的特定操作,可以向下转型到这样的子类,从而获得特定操作。比如说,对于UDP的数据报的传输,有特定的join()和leave()操作,我们可以向下转型到DatagramChannel从而使用这些操作。

释放资源

当一个Channel不再使用时,须调用close()或者close(ChannelPromise)方法释放资源。

Channel配置参数 ChannelOption

(1). 通用参数

  • CONNECT_TIMEOUT_MILLIS

        Netty参数,连接超时毫秒数,默认值30000毫秒即30秒。

  • MAX_MESSAGES_PER_READ

        Netty参数,一次Loop读取的最大消息数,对于ServerChannel或者NioByteChannel,默认值为16,其他Channel默认值为1。默认值这样设置,是因为:ServerChannel需要接受足够多的连接,保证大吞吐量,NioByteChannel可以减少不必要的系统调用select。

  • WRITE_SPIN_COUNT

        Netty参数,一个Loop写操作执行的最大次数,默认值为16。也就是说,对于大数据量的写操作至多进行16次,如果16次仍没有全部写完数据,此时会提交一个新的写任务给EventLoop,任务将在下次调度继续执行。这样,其他的写请求才能被响应不会因为单个大数据量写请求而耽误。

  • ALLOCATOR

        Netty参数,ByteBuf的分配器,默认值为ByteBufAllocator.DEFAULT,4.0版本为UnpooledByteBufAllocator,4.1版本为PooledByteBufAllocator。该值也可以使用系统参数io.netty.allocator.type配置,使用字符串值:”unpooled”,”pooled”。

  • RCVBUF_ALLOCATOR

        Netty参数,用于Channel分配接受Buffer的分配器,默认值为AdaptiveRecvByteBufAllocator.DEFAULT,是一个自适应的接受缓冲区分配器,能根据接受到的数据自动调节大小。可选值为FixedRecvByteBufAllocator,固定大小的接受缓冲区分配器。

  • AUTO_READ

        Netty参数,自动读取,默认值为True。Netty只在必要的时候才设置关心相应的I/O事件。对于读操作,需要调用channel.read()设置关心的I/O事件为OP_READ,这样若有数据到达才能读取以供用户处理。该值为True时,每次读操作完毕后会自动调用channel.read(),从而有数据到达便能读取;否则,需要用户手动调用channel.read()。需要注意的是:当调用config.setAutoRead(boolean)方法时,如果状态由false变为true,将会调用channel.read()方法读取数据;由true变为false,将调用config.autoReadCleared()方法终止数据读取。

  • WRITE_BUFFER_HIGH_WATER_MARK

        Netty参数,写高水位标记,默认值64KB。如果Netty的写缓冲区中的字节超过该值,Channel的isWritable()返回False。

  • WRITE_BUFFER_LOW_WATER_MARK

        Netty参数,写低水位标记,默认值32KB。当Netty的写缓冲区中的字节超过高水位之后若下降到低水位,则Channel的isWritable()返回True。写高低水位标记使用户可以控制写入数据速度,从而实现流量控制。推荐做法是:每次调用channl.write(msg)方法首先调用channel.isWritable()判断是否可写。

  • MESSAGE_SIZE_ESTIMATOR

        Netty参数,消息大小估算器,默认为DefaultMessageSizeEstimator.DEFAULT。估算ByteBuf、ByteBufHolder和FileRegion的大小,其中ByteBuf和ByteBufHolder为实际大小,FileRegion估算值为0。该值估算的字节数在计算水位时使用,FileRegion为0可知FileRegion不影响高低水位。

  • SINGLE_EVENTEXECUTOR_PER_GROUP

        Netty参数,单线程执行ChannelPipeline中的事件,默认值为True。该值控制执行ChannelPipeline中执行ChannelHandler的线程。如果为Trye,整个pipeline由一个线程执行,这样不需要进行线程切换以及线程同步,是Netty4的推荐做法;如果为False,ChannelHandler中的处理过程会由Group中的不同线程执行。

(2).SocketChannel参数

  • SO_RCVBUF

        Socket参数,TCP数据接收缓冲区大小。该缓冲区即TCP接收滑动窗口,linux操作系统可使用命令:cat /proc/sys/net/ipv4/tcp_rmem查询其大小。一般情况下,该值可由用户在任意时刻设置,但当设置值超过64KB时,需要在连接到远端之前设置。

  • SO_SNDBUF

        Socket参数,TCP数据发送缓冲区大小。该缓冲区即TCP发送滑动窗口,linux操作系统可使用命令:cat /proc/sys/net/ipv4/tcp_smem查询其大小。

  • TCP_NODELAY

        TCP参数,立即发送数据,默认值为Ture(Netty默认为True而操作系统默认为False)。该值设置Nagle算法的启用,改算法将小的碎片数据连接成更大的报文来最小化所发送的报文的数量,如果需要发送一些较小的报文,则需要禁用该算法。Netty默认禁用该算法,从而最小化报文传输延时。

  • SO_KEEPALIVE

        Socket参数,连接保活,默认值为False。启用该功能时,TCP会主动探测空闲连接的有效性。可以将此功能视为TCP的心跳机制,需要注意的是:默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。

  • SO_REUSEADDR

        Socket参数,地址复用,默认值False。有四种情况可以使用:(1).当有一个有相同本地地址和端口的socket1处于TIME_WAIT状态时,而你希望启动的程序的socket2要占用该地址和端口,比如重启服务且保持先前端口。(2).有多块网卡或用IP Alias技术的机器在同一端口启动多个进程,但每个进程绑定的本地IP地址不能相同。(3).单个进程绑定相同的端口到多个socket上,但每个socket绑定的ip地址不同。(4).完全相同的地址和端口的重复绑定。但这只用于UDP的多播,不用于TCP。

  • SO_LINGER

         Netty对底层Socket参数的简单封装,关闭Socket的延迟时间,默认值为-1,表示禁用该功能。-1以及所有<0的数表示socket.close()方法立即返回,但OS底层会将发送缓冲区全部发送到对端。0表示socket.close()方法立即返回,OS放弃发送缓冲区的数据直接向对端发送RST包,对端收到复位错误。非0整数值表示调用socket.close()方法的线程被阻塞直到延迟时间到或发送缓冲区中的数据发送完毕,若超时,则对端会收到复位错误。

  • IP_TOS

        IP参数,设置IP头部的Type-of-Service字段,用于描述IP包的优先级和QoS选项。

  • ALLOW_HALF_CLOSURE

        Netty参数,一个连接的远端关闭时本地端是否关闭,默认值为False。值为False时,连接自动关闭;为True时,触发ChannelInboundHandler的userEventTriggered()方法,事件为ChannelInputShutdownEvent。

(3).ServerSocketChannel参数

  • SO_RCVBUF

        已说明,需要注意的是:当设置值超过64KB时,需要在绑定到本地端口前设置。该值设置的是由ServerSocketChannel使用accept接受的SocketChannel的接收缓冲区。

  • SO_REUSEADDR

        已说明

  • SO_BACKLOG

        Socket参数,服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。默认值,Windows为200,其他为128。

(4).DatagramChannel参数

  • SO_BROADCAST

        Socket参数,设置广播模式。

  • SO_RCVBUF

        已说明

  • SO_SNDBUF

        已说明

  • SO_REUSEADDR

        已说明

  • IP_MULTICAST_LOOP_DISABLED

        对应IP参数IP_MULTICAST_LOOP,设置本地回环接口的多播功能。由于P_MULTICAST_LOOP返回True表示关闭,所以Netty加上后缀_DISABLED防止歧义。

  • IP_MULTICAST_ADDR

        对应IP参数IP_MULTICAST_IF,设置对应地址的网卡为多播模式。

  • IP_MULTICAST_IF

        对应IP参数IP_MULTICAST_IF2,同上但支持IPV6。

  • IP_MULTICAST_TTL

        IP参数,多播数据报的time-to-live即存活跳数。

  • IP_TOS

        已说明

  • DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION

        Netty参数,DatagramChannel注册的EventLoop即表示已激活。

I/O事件处理


    ChannelFuture bind(SocketAddress localAddress);
    ChannelFuture connect(SocketAddress remoteAddress);
    ChannelFuture disconnect();
    ChannelFuture close();
    ChannelFuture deregister();
    Channel read();
    ChannelFuture write(Object msg);
    Channel flush();
    ChannelFuture writeAndFlush(Object msg);
复制代码

这里的I/O事件都是outbound出站事件,表示由用户发起,即用户可以调用这些方法产生响应的事件

Unsafe

Unsafe?直译中文为不安全,这曾给我带来极大的困扰。如果你是第一次遇到这种接口,一定会和我感同身受。一个Unsafe对象是不安全的?这里说的不安全,是相对于用户程序员而言的,也就是说,用户程序员使用Netty进行编程时不会接触到这个接口和相关类。为什么不会接触到呢?因为类似的接口和类是Netty的大量内部实现细节,不会暴露给用户程序员。然而我们的目标是自顶向下深入分析Netty,所以有必要深入Unsafe雷区。我们先看Unsafe接口中的方法:

SocketAddress localAddress();   // 本地地址
    SocketAddress remoteAddress();  // 远端地址
    ChannelPromise voidPromise();   // 不关心结果的异步Promise?
    ChannelOutboundBuffer outboundBuffer(); // 写缓冲区
    void register(EventLoop eventLoop, ChannelPromise promise);
    void bind(SocketAddress localAddress, ChannelPromise promise);
    void connect(SocketAddress remoteAddress, SocketAddress localAddress, 
                              ChannelPromise promise);
    void disconnect(ChannelPromise promise);
    void close(ChannelPromise promise);
    void closeForcibly();
    void deregister(ChannelPromise promise);
    void beginRead();
    void write(Object msg, ChannelPromise promise);
    void flush();
复制代码

也许你已经发现Unsafe接口和Channel接口中都有register、bind等I/O事件相关的方法,它们有什么区别呢?回忆一下EventLoop线程实现,当一个selectedKey就绪时,对I/O事件的处理委托给unsafe对象实现,代码类似如下:

if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
        k.interestOps(k.interestOps() & ~SelectionKey.OP_CONNECT); 
        unsafe.finishConnect(); 
    }
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0
                  || readyOps == 0) {
        unsafe.read(); 
    }
    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
        ch.unsafe().forceFlush();
    }
复制代码

也就是说,Unsafe的子类作为Channel的内部类,负责处理底层NIO相关的I/O事件。Channel则使用责任链的方式通过ChannelPipeline将事件提供给用户自定义处理。

Channel实现

需要实现channelHandler具体实现类

  • ChannelInboundHandler处理IO事件
  • ChannelOutboundHandler处理IO操作

更方便的方式:实现对应的适配器。

  • ChannelInboundHandlerAdapter处理IO事件
  • ChannelOutboundHandlerAdapter处理IO操作
  • ChannelDuplexHandler处理IO事件与操作

点击查看后续

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享