Socket Buffer 的一个小注意事项
在基于流的传输(如 TCP/IP)中,接收到的数据存储在套接字接收缓冲区中。不幸的是,基于流的传输的缓冲区不是数据包队列而是字节队列。这意味着,即使您将两条消息作为两个独立的数据包发送,操作系统也不会将它们视为两条消息,而只是一堆字节。因此,无法保证您阅读的内容与您的远程对等方所写的内容完全相同。例如,让我们假设操作系统的 TCP/IP 堆栈收到了三个数据包:
发送时收到三个数据包
由于基于流的协议的这一一般属性,在您的应用程序中很有可能以以下碎片形式读取它们:
三个数据包拆分并合并为四个缓冲区
因此,接收部分,无论是服务器端还是客户端,都应该将接收到的数据碎片整理成一个或多个有意义的帧,这些帧可以很容易地被应用程序逻辑理解。在上面的例子中,接收到的数据应该是这样的:
四个缓冲区碎片整理成三个
第一个解决方案
现在让我们回到TIME客户端示例。我们这里也有同样的问题。一个 32 位整数是一个非常小的数据量,它不太可能经常被碎片化。但是问题是可以分片,随着流量的增加,分片的可能性也会增加。
简单的解决方案是创建一个内部累积缓冲区,并等待所有 4 个字节都接收到内部缓冲区中。以下是TimeClientHandler解决问题的修改后的实现:
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private ByteBuf buf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
buf = ctx.alloc().buffer(4); // (1)
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.release(); // (1)
buf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m); // (2)
m.release();
if (buf.readableBytes() >= 4) { // (3)
long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
复制代码
- 一个ChannelHandler有两个生命周期侦听器方法:handlerAdded()和handlerRemoved(). 您可以执行任意(去)初始化任务,只要它长时间不阻塞即可。
- 首先,应将所有接收到的数据累加到buf.
- 然后,处理程序必须检查是否buf有足够的数据,在此示例中为 4 个字节,然后继续执行实际的业务逻辑。否则,channelRead()当更多数据到达时,Netty 将再次调用该方法,最终将所有 4 个字节累加。
第二种解决方案
尽管第一个解决方案已经解决了TIME客户端的问题,但修改后的处理程序看起来并不那么干净。想象一个更复杂的协议,它由多个字段组成,例如可变长度字段。您的ChannelInboundHandler实施将很快变得不可维护。
您可能已经注意到,您可以将多个ChannelHandler添加到ChannelPipeline,因此,您可以将一个整体ChannelHandler拆分为多个模块化ChannelHandler,以降低应用程序的复杂性。例如,您可以拆分TimeClientHandler为两个处理程序:
- TimeDecoder 处理碎片问题,以及
- 的初始简单版本TimeClientHandler。
幸运的是,Netty 提供了一个可扩展的类,它可以帮助您编写第一个开箱即用的类:
package io.netty.example.time;
public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
out.add(in.readBytes(4)); // (4)
}
}
复制代码
- ByteToMessageDecoder是一个ChannelInboundHandler可以轻松处理碎片问题的实现。
- 每当接收到新数据时,ByteToMessageDecoder都会使用内部维护的累积缓冲区调用decode()方法。
- 当累积缓冲区中没有足够的数据时,decode()可以决定不向out添加任何内容。当接收到更多数据时,ByteToMessageDecoder将再次调用decode()。
- 如果decode()将对象添加到out,则表示解码器已成功解码消息。ByteToMessageDecoder将丢弃累积缓冲区的读取部分。请记住,您不需要解码多条消息。ByteToMessageDecoder将继续调用decode()方法,直到它不向out添加任何内容
现在我们有了另一个处理程序要插入到ChannelPipeline中,我们应该修改TimeClient中的ChannelInitializer实现:
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});
复制代码
如果你是一个喜欢冒险的人,你可能会想尝试一下ReplayingDecoder,它可以简化解码器。不过,您需要参考API参考以获取更多信息。
public class TimeDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(
ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(4));
}
}
复制代码
此外,Netty 提供了开箱即用的解码器,使您能够非常轻松地实现大多数协议,并帮助您避免最终使用无法维护的单片处理程序实现。有关更详细的示例,请参阅以下软件包:
- io.netty.example.factorial 对于二进制协议,和
- io.netty.example.telnet 对于基于文本行的协议。