处理基于流的传输

Socket Buffer 的一个小注意事项
在基于流的传输(如 TCP/IP)中,接收到的数据存储在套接字接收缓冲区中。不幸的是,基于流的传输的缓冲区不是数据包队列而是字节队列。这意味着,即使您将两条消息作为两个独立的数据包发送,操作系统也不会将它们视为两条消息,而只是一堆字节。因此,无法保证您阅读的内容与您的远程对等方所写的内容完全相同。例如,让我们假设操作系统的 TCP/IP 堆栈收到了三个数据包:

image.png
发送时收到三个数据包

由于基于流的协议的这一一般属性,在您的应用程序中很有可能以以下碎片形式读取它们:

image.png
三个数据包拆分并合并为四个缓冲区

因此,接收部分,无论是服务器端还是客户端,都应该将接收到的数据碎片整理成一个或多个有意义的帧,这些帧可以很容易地被应用程序逻辑理解。在上面的例子中,接收到的数据应该是这样的:

image.png
四个缓冲区碎片整理成三个

第一个解决方案

现在让我们回到TIME客户端示例。我们这里也有同样的问题。一个 32 位整数是一个非常小的数据量,它不太可能经常被碎片化。但是问题是可以分片,随着流量的增加,分片的可能性也会增加。

简单的解决方案是创建一个内部累积缓冲区,并等待所有 4 个字节都接收到内部缓冲区中。以下是TimeClientHandler解决问题的修改后的实现:

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();
        
        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
复制代码
  • 一个ChannelHandler有两个生命周期侦听器方法:handlerAdded()和handlerRemoved(). 您可以执行任意(去)初始化任务,只要它长时间不阻塞即可。
  • 首先,应将所有接收到的数据累加到buf.
  • 然后,处理程序必须检查是否buf有足够的数据,在此示例中为 4 个字节,然后继续执行实际的业务逻辑。否则,channelRead()当更多数据到达时,Netty 将再次调用该方法,最终将所有 4 个字节累加。

第二种解决方案

尽管第一个解决方案已经解决了TIME客户端的问题,但修改后的处理程序看起来并不那么干净。想象一个更复杂的协议,它由多个字段组成,例如可变长度字段。您的ChannelInboundHandler实施将很快变得不可维护。

您可能已经注意到,您可以将多个ChannelHandler添加到ChannelPipeline,因此,您可以将一个整体ChannelHandler拆分为多个模块化ChannelHandler,以降低应用程序的复杂性。例如,您可以拆分TimeClientHandler为两个处理程序:

  • TimeDecoder 处理碎片问题,以及
  • 的初始简单版本TimeClientHandler。

幸运的是,Netty 提供了一个可扩展的类,它可以帮助您编写第一个开箱即用的类:

package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        
        out.add(in.readBytes(4)); // (4)
    }
}
复制代码
  1. ByteToMessageDecoder是一个ChannelInboundHandler可以轻松处理碎片问题的实现。
  2. 每当接收到新数据时,ByteToMessageDecoder都会使用内部维护的累积缓冲区调用decode()方法。
  3. 当累积缓冲区中没有足够的数据时,decode()可以决定不向out添加任何内容。当接收到更多数据时,ByteToMessageDecoder将再次调用decode()。
  4. 如果decode()将对象添加到out,则表示解码器已成功解码消息。ByteToMessageDecoder将丢弃累积缓冲区的读取部分。请记住,您不需要解码多条消息。ByteToMessageDecoder将继续调用decode()方法,直到它不向out添加任何内容

现在我们有了另一个处理程序要插入到ChannelPipeline中,我们应该修改TimeClient中的ChannelInitializer实现:

b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});
复制代码

如果你是一个喜欢冒险的人,你可能会想尝试一下ReplayingDecoder,它可以简化解码器。不过,您需要参考API参考以获取更多信息。

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}
复制代码

此外,Netty 提供了开箱即用的解码器,使您能够非常轻松地实现大多数协议,并帮助您避免最终使用无法维护的单片处理程序实现。有关更详细的示例,请参阅以下软件包:

  • io.netty.example.factorial 对于二进制协议,和
  • io.netty.example.telnet 对于基于文本行的协议。
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享