使用Netty解决TCP粘包-拆包问题

你的名字 2022-05-13 10:06 464阅读 0赞

TCP粘包和拆包

TCP底层并不知道上层业务数据的具体含义,它会根据缓冲区的实际情况进行包的拆分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能将多个小的数据包封装成一个大的数据包发送,这就是所谓的TCP粘包/拆包问题。

TCP粘包/拆包的原因

1.应用程序写入的字节大小大于套接字发送缓冲区大小;
2.进行MSS大小的TCP分段;
3.以太网帧的payload大于MTU进行IP分片。

解决策略

由于底层的TCP协议无法理解上层业务数据,所以在底层是无法保证数据包不被拆分和重组的。只能通过上层的业务协议栈设计来解决,根据业务的主流 协议的解决方案,归纳为如下:
(1)消息定长,例如每个报文的大小定位200字节,如果不够,空位补空格;
(2)包结尾增加回车换行符进行分割,例如FTP协议;
(3)将消息分为消息头和消息体,消息头中包含标识消息的总长度(或者消息体长度)的字段,通常涉及思路是消息头的第一个字段使用int32来表示消息的总长度;
(4)更复杂的应用层协议。

未考虑TCP粘包导致的功能异常案例

服务端代码片段:

  1. class TimeServerHandler extends ChannelHandlerAdapter {
  2. private int counter = 0;
  3. @Override
  4. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  5. ctx.close();
  6. }
  7. @Override
  8. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  9. // ByteBuf类似于JDK中的ByteBuffer,但提供更强大跟灵活的功能
  10. ByteBuf buf = (ByteBuf) msg;
  11. byte[] req = new byte[buf.readableBytes()]; // 根据缓冲区可读字节数构建字节数组
  12. buf.readBytes(req); // 将缓冲区的直接数组复制到req
  13. String body = new String(req, "UTF-8");
  14. System.out.println("接收到客户端请求:" + body + ",counter:" + ++counter);
  15. // 如果接受到的消息时Server Time,则异步将服务端当前时间发送给客户端。
  16. if ("Server Time".equalsIgnoreCase(body)) {
  17. ByteBuf resp = Unpooled.copiedBuffer((new Date()).toString().getBytes());
  18. // 这里write方法只是将数据写入缓冲区,并没有真正发送
  19. ctx.write(resp);
  20. }
  21. }
  22. @Override
  23. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  24. // 将缓冲区的数据写入SocketChannel
  25. ctx.flush();
  26. }
  27. }

这里增加了counter来统计接收到报文的数量。

客户端代码片段:

  1. class TimeClientHandler extends ChannelHandlerAdapter {
  2. private ByteBuf msgSendBuf;
  3. private int counter = 0;
  4. public TimeClientHandler() {
  5. }
  6. @Override
  7. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  8. ctx.close();
  9. }
  10. @Override
  11. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  12. // 链路建立成功后,将Server Time请求发送给服务端
  13. for (int i=0;i<100;i++) { // 这里循环发送100次请求
  14. // 待发送数据
  15. String req = "Server Time";
  16. msgSendBuf = Unpooled.copiedBuffer(req.getBytes());
  17. ctx.writeAndFlush(msgSendBuf);
  18. }
  19. }
  20. @Override
  21. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  22. // 接收服务端响应
  23. ByteBuf buf = (ByteBuf) msg;
  24. byte[] resp = new byte[buf.readableBytes()];
  25. buf.readBytes(resp);
  26. String response = new String(resp, "UTF-8");
  27. System.out.println("接收到服务端响应:" + response + ",counter:" + ++counter);
  28. }
  29. }

客户端在连接上服务端后,循环发送100条报文。在Handler类中增加了counter来统计接收到的服务器响应的次数。

运行结果如下:
服务端
68864170.jpg

客户端:
57646709.jpg

可以看到服务端只接收到2次,客户端并没有接收到服务端的响应。
服务端只接收到2次的原因是发生了TCP粘包,第1次接收的报文长度是1024。
客户端没有接收到服务器响应是因为由于发送了粘包,导致不符合服务端响应的条件。

使用LineBasedFrameDecoder和StringDecoder解决粘包

服务端:

  1. protected void initChannel(SocketChannel socketChannel) throws Exception {
  2. socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
  3. socketChannel.pipeline().addLast(new StringDecoder());
  4. socketChannel.pipeline().addLast(new TimeServerHandler());
  5. }

增加了LineBasedFrameDecoder和StringDecoder解码器。

TimeServerHandler的channelRead方法修改:

  1. @Override
  2. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  3. String body = (String) msg;
  4. System.out.println("接收到客户端请求:" + body + ",counter:" + ++counter);
  5. // 如果接受到的消息时Server Time,则异步将服务端当前时间发送给客户端。
  6. if ("Server Time".equalsIgnoreCase(body)) {
  7. byte[] data = ((new Date()).toString() + System.getProperty("line.separator")).getBytes();
  8. ByteBuf resp = Unpooled.copiedBuffer(data);
  9. // 这里write方法只是将数据写入缓冲区,并没有真正发送
  10. ctx.write(resp);
  11. }
  12. }

1.直接将msg转换为String;
2.响应的消息最后增加了换行符。

客户端:

  1. b.group(group)
  2. .channel(NioSocketChannel.class) // 设置线程的Channel
  3. .option(ChannelOption.TCP_NODELAY, true) // 设置NIOSocketChannel的参数
  4. .handler(new ChannelInitializer<SocketChannel>() { // 绑定I/O事件处理类
  5. protected void initChannel(SocketChannel socketChannel) throws Exception {
  6. socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
  7. socketChannel.pipeline().addLast(new StringDecoder());
  8. socketChannel.pipeline().addLast(new TimeClientHandler());
  9. }
  10. });

增加了LineBasedFrameDecoder和StringDecoder解码器。

TimeClientHandler的channelActive和channelRead方法修改:

  1. @Override
  2. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  3. // 链路建立成功后,将Server Time请求发送给服务端
  4. String req = "Server Time" + System.getProperty("line.separator");
  5. for (int i=0;i<100;i++) { // 这里循环发送100次请求
  6. // 待发送数据
  7. msgSendBuf = Unpooled.copiedBuffer(req.getBytes());
  8. ctx.writeAndFlush(msgSendBuf);
  9. }
  10. }
  11. @Override
  12. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  13. // 接收服务端响应
  14. String response = (String) msg;
  15. System.out.println("接收到服务端响应:" + response + ",counter:" + ++counter);
  16. }

1.发送的报文最后增加换行符;
2.接收的消息msg直接转为String。

运行结果:
服务端
72973720.jpg
客户端
8257291.jpg

LineBasedFrameDecoder和StringDecoder的原理分析

LineBasedFrameDecoder的工作原理是依次遍历ByteBuf中的可读直接,看是否有\r\n或\n,如果有,就以此位置为结束为止,从可读索引到结束位置区间的字节就组成了一行。
它是以换行符为标志的解码器。支持携带结束符或不携带结束符两种解码方式,同时支持配置单行的最大长度。如果在连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略之前读到的异常码流。

StringDecoder的功能非常简单,就是将接受到的对象转换层字符串,然后继续调用后面的Handler。
LineBasedFrameDecoder和StringDecoder组合起来就是支持换行的文本解码器,被设计用来支持TCP粘包和拆包。

参考《Netty权威指南》

发表评论

表情:
评论列表 (有 0 条评论,464人围观)

还没有评论,来说两句吧...

相关阅读

    相关 NettyTCP问题

           粘包拆包问题是处于网络比较底层的问题,在数据链路层、网络层以及传输层都有可能发生。我们日常的网络应用开发大都在传输层进行,由于UDP有消息保护边界,不会发生这个问