TCP粘包/拆包现象模拟及其解决方案

粘包/拆包背景

首先,我们知道TCP是字节流,就是像我们的水流一样,是没有界限的;所以,它会出现粘包/拆包的问题;而像我们的UDP是面向报文的,所以,UDP就不会出现这个问题;
TCP分包:分包的话,以下情况可能会导致分包:1.像我们写入套接字缓存区的字节大小超过了缓冲区最大值;2.网络发送数据时,受限于MSS最大报文长度的限制;会导致拆包;3.在数据链路层中,因为不同的数据链路层是不一样的,所以,不一定是以太网协议,所以,也有可能传输数据的单元较小,会导致IP分片;
如果说:每个请求的数据量并不大,获取就避免了拆包问题;但是,又产生了另外一个问题:TCP粘包

一、TCP粘包

每次发送的数据量是不定的。有可能仅仅是为了发送一个心跳,下一秒就是一个大文件传输。所以我们只能面对去解决多变的业务场景。TCP 协议为了提高传输效率,同时兼容更加复杂的环境,往往做了这方面的优化。例如说,发送端为了提高网络传输的成功率(),一般都是收集到足够的数据才发送一个 TCP 段。这样如果你有多个请求的数据量极小,那么 TCP 发送出去的数据可能包含了多个请求的数据,这就导致了多个数据包粘在一起了;而同样,由于接收方不及时接收缓冲区的包,造成多个包接收(客户端发送了一段数据,服务端只收了一小部分,服务端下次再收的时候还是从缓冲区拿上次遗留的数据,产生粘包)。这就是 TCP 的粘包行为。
代码模拟
客户端:

/**
 * @auther:lgb
 * @Date: 2021/5/3
 */
public class BIOTcpClient {
    public static void main(String[] args)  {
        //定义缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        SocketChannel socketChannel = null;
        try {
            //打开SocketChannel
            socketChannel = SocketChannel.open();
            //设置为非阻塞模式
            socketChannel.configureBlocking(false);
            //连接服务
            socketChannel.connect(new InetSocketAddress("127.0.0.1", 2333));
            while (true) {
                //这里的finishConnect是尝试连接,有可能返回false,因此使用死循环进行连接检查,确保连接已经正常建立。
                if (socketChannel.finishConnect()) {
                    System.out.println("客户端已连接到服务器");
                    int i = 0;
                    while (i < 5) {
                        //隔一耗秒钟写一条
                        TimeUnit.MILLISECONDS.sleep(1);
                        String info = "来自客户端的第" + (i++) + "条消息";
                        buffer.clear();
                        buffer.put(info.getBytes());
                        buffer.flip();
                        while (buffer.hasRemaining()) {
                            //给服务写消息
                            socketChannel.write(buffer);
                        }
                    }
                    break;
                }
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                if (socketChannel != null) {
                    System.out.println("客户端Channel关闭");
                    socketChannel.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}


复制代码

服务器端:

/**
 * @auther:lgb
 * @Date: 2021/5/3
 */
public class BIOTcpServer {
    public static void main(String[] args) {
        ServerSocket serverSocket = null;
        int recvMsgSize = 0;
        InputStream in = null;
        try {
            //开一个监听2333端口的TCP服务
            serverSocket = new ServerSocket(2333);
            byte[] recvBuf = new byte[1024];
            while (true) {
                //探听有没有新的客户端连接进来,没有就阻塞
                Socket clntSocket = serverSocket.accept();
                //通过跟服务连接上的客户端socket,拿到客户端地址
                SocketAddress clientAddress = clntSocket.getRemoteSocketAddress();
                System.out.println("连接成功,处理客户端:" + clientAddress);
                // 延迟10ms 来模拟粘包现象
                Thread.sleep(10);
                //数据流
                in = clntSocket.getInputStream();
                //读取发送的数据,当客户端未断开连接,且不往服务端发数据的时候,说明一直处于准备读的状态,会一直阻塞下去,直到有数据写入(读就绪)
                while ((recvMsgSize = in.read(recvBuf)) != -1) {
                    byte[] temp = new byte[recvMsgSize];
                    System.arraycopy(recvBuf, 0, temp, 0, recvMsgSize);
                    System.out.println("收到客户端" + clientAddress + "的消息内容:" + new String(temp)); //打印消息
                }
                System.out.println("-----------------------------------");
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                if (serverSocket != null) {
                    System.out.println("socket关闭!");
                    serverSocket.close();
                }
                if (in != null) {
                    System.out.println("stream连接关闭!");
                    in.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
复制代码

具体逻辑看注释即可;
由于,服务器端延时时间为10ms,由于数据处理不及时,会导致粘包现象

image.png
将服务器端的延迟时间改为0ms 就会收到正确的数据:

image.png
解决方案

  1. 传输的消息是定长的,长度已知,接收方就可以根据已知长度来获取数据即可;
  2. 在数据包的尾部加上特殊标记作为结束条件;
  3. 将消息分为消息头和消息体。消息头中包含了消息的总长度;

二、解决方案之自定义协议类型

2.1 NIO编程

由于,BIO是同步非阻塞,在服务端需要用一个线程去处理一个连接,虽然,可以使用线程池来实现线程的复用,但是,大多数场景下不太适用(感觉像文件传输,这种一直在传输数据的场景可以进行使用);所以,选择NIO来进行编程;对于,NIO的简介,可以看我的另一篇文章。NIO简介及背景

2.2 编程实现

思路:将消息分为消息头和消息体,根据消息头来接收一个包;来作为一个数据包;
因为,我们是需要解决当Buffer中数据比较多的情况下(模拟粘包现象),来看能否是否将数据正常分割出来;
解决方案如下:

  1. 客户端向服务器连续发送五条消息
  2. 服务器selector收到客户端状态为可读时,先延时1s,确保客户端的五条数据都在一个Buffer中;然后,对数据进行处理,看看数据是否是粘包的数据;

2.2.1 添加长度首部

在发送端需要给待发送的数据添加固定的首部,然后再发送出去,然后,在接收端需要根据这个首部的长度信息来进行数据包的组合或拆分;这个添加首部的工具类如下:

/**
 * @auther:lgb
 * @Date: 2021/5/3
 */
public class PacketWrapper {

    private int length;
    private byte[] payload;

    public PacketWrapper(String payload) {
        this.payload = payload.getBytes();
        this.length = this.payload.length;
    }

    public PacketWrapper(byte[] payload) {
        this.payload = payload;
        this.length = this.payload.length;
    }

    public byte[] getBytes() {
        ByteBuffer byteBuffer = ByteBuffer.allocate(this.length + 4);
        byteBuffer.putInt(this.length);
        System.out.println("客户端发送的长度为:" + this.length);
        byteBuffer.put(payload);
        return byteBuffer.array();
    }
    
}
复制代码

2.2.2 客户端代码

/**
 * @auther:lgb
 * @Date: 2021/5/3
 */
public class NIOTcpClient {
    private static final int PORT = 5555;
    private static final String IP_ADDRESS = "localhost";
    public static void main(String[] args) throws IOException {

        Selector selector = Selector.open();
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);

        socketChannel.connect(new InetSocketAddress(IP_ADDRESS, PORT));
        while (!socketChannel.finishConnect()) {};

        new Thread(new SendRunnable(socketChannel)).start();

        System.out.println("Connecting to " + IP_ADDRESS + " on " + PORT);
        while (true) {
            selector.select();
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                if (selectionKey.isReadable()) {
                    SocketChannel channel = (SocketChannel) selectionKey.channel();
                    StringBuilder sb = new StringBuilder();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    while (channel.read(byteBuffer) > 0) {
                        byteBuffer.flip();
                        sb.append(new String(byteBuffer.array()));
                        byteBuffer.clear();
                    }
                    System.out.println("[server] " + sb.toString());
                }
                iterator.remove();
            }
        }
    }

    private static class SendRunnable implements Runnable {

        private SocketChannel socketChannel;
        public SendRunnable(SocketChannel socketChannel) {
            this.socketChannel = socketChannel;
        }

        @Override
        public void run() {
            System.out.println("Type to send message:");
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            try {
                socketChannel.socket().setSendBufferSize(102400);
            } catch (SocketException e) {
                e.printStackTrace();
            }
            int i = 0;
            while (i < 5) {
                try {
                    String msg = "客户端发送了第" + (i++) + "条消息";
                    // buffer.flip();
                    // buffer.compact(); // 要从读模式到写模式 使用compact()才有意义;
                    buffer.clear();
                    byte[] bytes = new PacketWrapper(msg).getBytes();
                    System.out.println(msg);
                    buffer.put(bytes);
                    // 变成读模式
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        //给服务写消息
                       socketChannel.write(buffer);


                    }
                    Thread.sleep(20);

                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

复制代码

2.2.3 服务端代码

/**
 * @auther:lgb
 * @Date: 2021/5/3
 */
public class NIOTcpServer {

    private static final int PORT = 5555;
    private static ByteBuffer byteBuffer = ByteBuffer.allocate(10240);
    private static int number = 0;

    public static void main(String[] args) throws IOException {

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
        serverSocketChannel.configureBlocking(false);

        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        System.out.println("server start on port " + PORT + " ...");

        while (true) {

            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();

            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();

                if (!selectionKey.isValid())
                    continue;

                if (selectionKey.isAcceptable()) {
                    ServerSocketChannel serverChannel = (ServerSocketChannel) selectionKey.channel();
                    SocketChannel socketChannel = serverChannel.accept();
                    socketChannel.configureBlocking(false);
                    SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
                    // start the thread of writing
                    new Thread(new SendRunnable(socketChannel)).start();
                    Socket socket = socketChannel.socket();
                    System.out.println("Get a client, the remote client address: " + socket.getRemoteSocketAddress());
                } else if (selectionKey.isReadable()) {
                    // Read data from channel to buffer
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    socketChannel.socket().setReceiveBufferSize(102400);
                    String remoteAddress = socketChannel.socket().getRemoteSocketAddress().toString();

                    processByHead(socketChannel);
                }

                iterator.remove();
            }
        }
    }

    private static class SendRunnable implements Runnable {

        private SocketChannel socketChannel;

        public SendRunnable(SocketChannel socketChannel) {
            this.socketChannel = socketChannel;
        }

        public void run() {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
            while (true) {
                try {
                    String msg = bufferedReader.readLine();
                    this.socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    private static void processByHead(SocketChannel socketChannel) throws IOException {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        while (socketChannel.read(byteBuffer) > 0) {
            int position = byteBuffer.position();
            int limit = byteBuffer.limit();
            byteBuffer.flip();

            while (byteBuffer.remaining() > 4) {
                System.out.println("remaining :" + byteBuffer.remaining());
                System.out.println("position :" + byteBuffer.position());
                System.out.println("limit :" + byteBuffer.limit());
                // 从给定索引值(position)中,开始取出4个字节 根据字节顺序将它们组合成为一个int
                int length = byteBuffer.getInt();
                if (length != 0) {
                    System.out.println("客户端发送的长度为:" + length);
                    if (byteBuffer.remaining() < length) {
                        byteBuffer.position(position);
                        byteBuffer.limit(limit);
                        continue;
                    }

                    byte[] data = new byte[length];
                    byteBuffer.get(data, 0, length);
                    System.out.println(new String(data) + " <---> " + number++);

                } else {
                    break;
                }

            }
            // 需要变回写状态,客户端还是需要写数据的;注意,读一半数据,然后,再写数据 这时候用compact()才有意义
            byteBuffer.compact();

        }
    }


}

复制代码

测试如下:
客户端结果:

image.png
服务器结果:

image.png
结果分析:
因为,客户端发送过来的是5条消息,服务器第一次收到position = 0;limit = 175;说明,这些数据包一起被读取到Buffer中,存在粘包的前提条件;
由于,读取前4个字节,发现长度为31;所以,会先读取后31个字节;然后,读取数据为客户端发送了第0条消息。此时,position = 35 remaining = 140 还会继续读;直到remain < 4 时,才结束读取;
综上:实测效果满足开头提出的解决粘包现象;
gitee代码下载
相关借鉴博客如下:
nio原理

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享