粘包/拆包背景
首先,我们知道TCP是字节流,就是像我们的水流一样,是没有界限的;所以,它会出现粘包/拆包的问题;而像我们的UDP是面向报文的,所以,UDP就不会出现这个问题;
TCP分包:分包的话,以下情况可能会导致分包:1.像我们写入套接字缓存区的字节大小超过了缓冲区最大值;2.网络发送数据时,受限于MSS最大报文长度的限制;会导致拆包;3.在数据链路层中,因为不同的数据链路层是不一样的,所以,不一定是以太网协议,所以,也有可能传输数据的单元较小,会导致IP分片;
如果说:每个请求的数据量并不大,获取就避免了拆包问题;但是,又产生了另外一个问题:TCP粘包;
一、TCP粘包
每次发送的数据量是不定的。有可能仅仅是为了发送一个心跳,下一秒就是一个大文件传输。所以我们只能面对去解决多变的业务场景。TCP 协议为了提高传输效率,同时兼容更加复杂的环境,往往做了这方面的优化。例如说,发送端为了提高网络传输的成功率(),一般都是收集到足够的数据才发送一个 TCP 段。这样如果你有多个请求的数据量极小,那么 TCP 发送出去的数据可能包含了多个请求的数据,这就导致了多个数据包粘在一起了;而同样,由于接收方不及时接收缓冲区的包,造成多个包接收(客户端发送了一段数据,服务端只收了一小部分,服务端下次再收的时候还是从缓冲区拿上次遗留的数据,产生粘包)。这就是 TCP 的粘包行为。
代码模拟
客户端:
/**
* @auther:lgb
* @Date: 2021/5/3
*/
public class BIOTcpClient {
public static void main(String[] args) {
//定义缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = null;
try {
//打开SocketChannel
socketChannel = SocketChannel.open();
//设置为非阻塞模式
socketChannel.configureBlocking(false);
//连接服务
socketChannel.connect(new InetSocketAddress("127.0.0.1", 2333));
while (true) {
//这里的finishConnect是尝试连接,有可能返回false,因此使用死循环进行连接检查,确保连接已经正常建立。
if (socketChannel.finishConnect()) {
System.out.println("客户端已连接到服务器");
int i = 0;
while (i < 5) {
//隔一耗秒钟写一条
TimeUnit.MILLISECONDS.sleep(1);
String info = "来自客户端的第" + (i++) + "条消息";
buffer.clear();
buffer.put(info.getBytes());
buffer.flip();
while (buffer.hasRemaining()) {
//给服务写消息
socketChannel.write(buffer);
}
}
break;
}
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
if (socketChannel != null) {
System.out.println("客户端Channel关闭");
socketChannel.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
复制代码
服务器端:
/**
* @auther:lgb
* @Date: 2021/5/3
*/
public class BIOTcpServer {
public static void main(String[] args) {
ServerSocket serverSocket = null;
int recvMsgSize = 0;
InputStream in = null;
try {
//开一个监听2333端口的TCP服务
serverSocket = new ServerSocket(2333);
byte[] recvBuf = new byte[1024];
while (true) {
//探听有没有新的客户端连接进来,没有就阻塞
Socket clntSocket = serverSocket.accept();
//通过跟服务连接上的客户端socket,拿到客户端地址
SocketAddress clientAddress = clntSocket.getRemoteSocketAddress();
System.out.println("连接成功,处理客户端:" + clientAddress);
// 延迟10ms 来模拟粘包现象
Thread.sleep(10);
//数据流
in = clntSocket.getInputStream();
//读取发送的数据,当客户端未断开连接,且不往服务端发数据的时候,说明一直处于准备读的状态,会一直阻塞下去,直到有数据写入(读就绪)
while ((recvMsgSize = in.read(recvBuf)) != -1) {
byte[] temp = new byte[recvMsgSize];
System.arraycopy(recvBuf, 0, temp, 0, recvMsgSize);
System.out.println("收到客户端" + clientAddress + "的消息内容:" + new String(temp)); //打印消息
}
System.out.println("-----------------------------------");
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
if (serverSocket != null) {
System.out.println("socket关闭!");
serverSocket.close();
}
if (in != null) {
System.out.println("stream连接关闭!");
in.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
复制代码
具体逻辑看注释即可;
由于,服务器端延时时间为10ms,由于数据处理不及时,会导致粘包现象
将服务器端的延迟时间改为0ms 就会收到正确的数据:
解决方案
- 传输的消息是定长的,长度已知,接收方就可以根据已知长度来获取数据即可;
- 在数据包的尾部加上特殊标记作为结束条件;
- 将消息分为消息头和消息体。消息头中包含了消息的总长度;
二、解决方案之自定义协议类型
2.1 NIO编程
由于,BIO是同步非阻塞,在服务端需要用一个线程去处理一个连接,虽然,可以使用线程池来实现线程的复用,但是,大多数场景下不太适用(感觉像文件传输,这种一直在传输数据的场景可以进行使用);所以,选择NIO来进行编程;对于,NIO的简介,可以看我的另一篇文章。NIO简介及背景
2.2 编程实现
思路:将消息分为消息头和消息体,根据消息头来接收一个包;来作为一个数据包;
因为,我们是需要解决当Buffer中数据比较多的情况下(模拟粘包现象),来看能否是否将数据正常分割出来;
解决方案如下:
- 客户端向服务器连续发送五条消息
- 服务器selector收到客户端状态为可读时,先延时1s,确保客户端的五条数据都在一个Buffer中;然后,对数据进行处理,看看数据是否是粘包的数据;
2.2.1 添加长度首部
在发送端需要给待发送的数据添加固定的首部,然后再发送出去,然后,在接收端需要根据这个首部的长度信息来进行数据包的组合或拆分;这个添加首部的工具类如下:
/**
* @auther:lgb
* @Date: 2021/5/3
*/
public class PacketWrapper {
private int length;
private byte[] payload;
public PacketWrapper(String payload) {
this.payload = payload.getBytes();
this.length = this.payload.length;
}
public PacketWrapper(byte[] payload) {
this.payload = payload;
this.length = this.payload.length;
}
public byte[] getBytes() {
ByteBuffer byteBuffer = ByteBuffer.allocate(this.length + 4);
byteBuffer.putInt(this.length);
System.out.println("客户端发送的长度为:" + this.length);
byteBuffer.put(payload);
return byteBuffer.array();
}
}
复制代码
2.2.2 客户端代码
/**
* @auther:lgb
* @Date: 2021/5/3
*/
public class NIOTcpClient {
private static final int PORT = 5555;
private static final String IP_ADDRESS = "localhost";
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
socketChannel.connect(new InetSocketAddress(IP_ADDRESS, PORT));
while (!socketChannel.finishConnect()) {};
new Thread(new SendRunnable(socketChannel)).start();
System.out.println("Connecting to " + IP_ADDRESS + " on " + PORT);
while (true) {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
StringBuilder sb = new StringBuilder();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while (channel.read(byteBuffer) > 0) {
byteBuffer.flip();
sb.append(new String(byteBuffer.array()));
byteBuffer.clear();
}
System.out.println("[server] " + sb.toString());
}
iterator.remove();
}
}
}
private static class SendRunnable implements Runnable {
private SocketChannel socketChannel;
public SendRunnable(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void run() {
System.out.println("Type to send message:");
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
socketChannel.socket().setSendBufferSize(102400);
} catch (SocketException e) {
e.printStackTrace();
}
int i = 0;
while (i < 5) {
try {
String msg = "客户端发送了第" + (i++) + "条消息";
// buffer.flip();
// buffer.compact(); // 要从读模式到写模式 使用compact()才有意义;
buffer.clear();
byte[] bytes = new PacketWrapper(msg).getBytes();
System.out.println(msg);
buffer.put(bytes);
// 变成读模式
buffer.flip();
while (buffer.hasRemaining()) {
//给服务写消息
socketChannel.write(buffer);
}
Thread.sleep(20);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
复制代码
2.2.3 服务端代码
/**
* @auther:lgb
* @Date: 2021/5/3
*/
public class NIOTcpServer {
private static final int PORT = 5555;
private static ByteBuffer byteBuffer = ByteBuffer.allocate(10240);
private static int number = 0;
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("server start on port " + PORT + " ...");
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (!selectionKey.isValid())
continue;
if (selectionKey.isAcceptable()) {
ServerSocketChannel serverChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverChannel.accept();
socketChannel.configureBlocking(false);
SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
// start the thread of writing
new Thread(new SendRunnable(socketChannel)).start();
Socket socket = socketChannel.socket();
System.out.println("Get a client, the remote client address: " + socket.getRemoteSocketAddress());
} else if (selectionKey.isReadable()) {
// Read data from channel to buffer
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
socketChannel.socket().setReceiveBufferSize(102400);
String remoteAddress = socketChannel.socket().getRemoteSocketAddress().toString();
processByHead(socketChannel);
}
iterator.remove();
}
}
}
private static class SendRunnable implements Runnable {
private SocketChannel socketChannel;
public SendRunnable(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
public void run() {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
while (true) {
try {
String msg = bufferedReader.readLine();
this.socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private static void processByHead(SocketChannel socketChannel) throws IOException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (socketChannel.read(byteBuffer) > 0) {
int position = byteBuffer.position();
int limit = byteBuffer.limit();
byteBuffer.flip();
while (byteBuffer.remaining() > 4) {
System.out.println("remaining :" + byteBuffer.remaining());
System.out.println("position :" + byteBuffer.position());
System.out.println("limit :" + byteBuffer.limit());
// 从给定索引值(position)中,开始取出4个字节 根据字节顺序将它们组合成为一个int
int length = byteBuffer.getInt();
if (length != 0) {
System.out.println("客户端发送的长度为:" + length);
if (byteBuffer.remaining() < length) {
byteBuffer.position(position);
byteBuffer.limit(limit);
continue;
}
byte[] data = new byte[length];
byteBuffer.get(data, 0, length);
System.out.println(new String(data) + " <---> " + number++);
} else {
break;
}
}
// 需要变回写状态,客户端还是需要写数据的;注意,读一半数据,然后,再写数据 这时候用compact()才有意义
byteBuffer.compact();
}
}
}
复制代码
测试如下:
客户端结果:
服务器结果:
结果分析:
因为,客户端发送过来的是5条消息,服务器第一次收到position = 0;limit = 175;说明,这些数据包一起被读取到Buffer中,存在粘包的前提条件;
由于,读取前4个字节,发现长度为31;所以,会先读取后31个字节;然后,读取数据为客户端发送了第0条消息。此时,position = 35 remaining = 140 还会继续读;直到remain < 4 时,才结束读取;
综上:实测效果满足开头提出的解决粘包现象;
gitee代码下载
相关借鉴博客如下:
nio原理