Netty2:粘包/拆包问题与使用LineBasedFrameDecoder的解决方案

港控/mmm° 2023-07-18 14:06 84阅读 0赞

什么是粘包、拆包

粘包、拆包是Socket编程中最常遇见的一个问题,本文来研究一下Netty是如何解决粘包、拆包的,首先我们从什么是粘包、拆包开始说起:

  1. TCP是个"流"协议,所谓流,就是没有界限的一串数据,TCP底层并不了解上层业务的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上:
  2. * 一个完整的包可能会被TCP拆分为多个包进行发送(拆包)
  3. * 多个小的包也有可能被封装成一个大的包进行发送(粘包)

这就是所谓的TCP粘包与拆包

下图演示了粘包、拆包的场景:

format_png

基本上有四种情况:

  • Data1、Data2都分开发送到了Server端,没有产生粘包与拆包的情况
  • Data1、Data2数据粘在了一起,打成了一个大的包发送到了Server端,这种情况就是粘包
  • Data1被分成Data1_1与Data1_2,Data1_1先到服务端,Data1_2与Data2再到服务端,这种情况就是拆包
  • Data2被分成Data2_1与Data2_2,Data1与Data2_1先到服务端,Data2_2再到服务端,同上,这也是一种拆包的场景

粘包、拆包产生的原因

上面我们详细了解了TCP粘包与拆包,那么粘包与拆包为什么会发生呢,大致上有三种原因:

  • 应用程序写入的字节大小大于Socket发送缓冲区大小
  • 进行MSS大小的TCP,MSS是最大报文段长度的缩写,是TCP报文段中的数据字段最大长度,MSS=TCP报文段长度-TCP首部长度
  • 以太网的Payload大于MTU,进行IP分片,MTU是最大传输单元的缩写,以太网的MTU为1500字节

粘包、拆包解决策略

由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下:

  • 消息定长,例如每个报文的大小固定为200字节,如果不够空位补空格
  • 包尾增加回车换行符进行分割,例如FTP协议
  • 将消息分为消息头和消息体,消息头中包含表示长度的字段,通常涉及思路为消息头的第一个字段使用int32来表示消息的总长度
  • 更复杂的应用层协议

未考虑TCP粘包导致功能异常演示

基于Netty的第一篇文章《Netty1:初识Netty》,TimeServer与TimeClient不变,简单修改一下TimeServerHandler与TimeClientHandler即可以模拟出TCP粘包的情况,首先修改TimeClientHandler:

  1. 1 public class TimeClientHandler extends ChannelHandlerAdapter {
  2. 2
  3. 3 private static final Logger LOGGER = LoggerFactory.getLogger(TimeClientHandler.class);
  4. 4
  5. 5 private int counter;
  6. 6
  7. 7 private byte[] req;
  8. 8
  9. 9 public TimeClientHandler() {
  10. 10 req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
  11. 11 }
  12. 12
  13. 13 @Override
  14. 14 public void channelActive(ChannelHandlerContext ctx) throws Exception {
  15. 15 ByteBuf message = null;
  16. 16 for (int i = 0; i < 100; i++) {
  17. 17 message = Unpooled.buffer(req.length);
  18. 18 message.writeBytes(req);
  19. 19 ctx.writeAndFlush(message);
  20. 20 }
  21. 21 }
  22. 22
  23. 23 @Override
  24. 24 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  25. 25 ByteBuf buf = (ByteBuf)msg;
  26. 26 byte[] req = new byte[buf.readableBytes()];
  27. 27 buf.readBytes(req);
  28. 28
  29. 29 String body = new String(req, "UTF-8");
  30. 30 System.out.println("Now is:" + body + "; the counter is:" + ++counter);
  31. 31 }
  32. 32
  33. 33 @Override
  34. 34 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  35. 35 LOGGER.warn("Unexcepted exception from downstream:" + cause.getMessage());
  36. 36 ctx.close();
  37. 37 }
  38. 38
  39. 39 }

TimeClientHandler的变化是,之前是发送一次”QUERY TIME ORDER”到服务端,现在变为发送100次“QUERY TIME ORDER”+标准换行符到服务端,并在客户端增加一个计数器,记录从服务端收到的响应次数。

服务单TimeServerHandler也简单改造一下,增加一个计数器记录一下从客户端收到的请求次数:

  1. 1 public class TimeServerHandler extends ChannelHandlerAdapter {
  2. 2
  3. 3 private int counter;
  4. 4
  5. 5 @Override
  6. 6 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  7. 7 ByteBuf buf = (ByteBuf)msg;
  8. 8 byte[] req = new byte[buf.readableBytes()];
  9. 9 buf.readBytes(req);
  10. 10
  11. 11 String body = new String(req, "UTF-8").substring(0, req.length - System.getProperty("line.separator").length());
  12. 12 System.out.println("The time server receive order:" + body + "; the counter is:" + ++counter);
  13. 13
  14. 14 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
  15. 15 currentTime = currentTime + System.getProperty("line.separator");
  16. 16
  17. 17 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
  18. 18 ctx.writeAndFlush(resp);
  19. 19 }
  20. 20
  21. 21 @Override
  22. 22 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  23. 23 ctx.close();
  24. 24 }
  25. 25
  26. 26 }

按照设计,服务端应该会打印出100次”Time time server…”,客户端应当会打印出100次”Now is …”,因为客户端向服务端发送了100次”QUERY TIME ORDER”的请求,实际运行起来呢?先看一下服务端的打印:

  1. The time server receive order:QUERY TIME ORDER
  2. QUERY TIME ORDER
  3. ...省略,这里有55
  4. QUERY TIME ORD; the counter is:1
  5. The time server receive order:
  6. ...省略,这里有42
  7. QUERY TIME ORDER; the counter is:2

counter最终等于2,表明服务端实际上只收到了2条请求,很显然这里发生了粘包,即多个客户端的包合成了一个发送到了服务端,服务端每收到一个包的大小为1024字节。

接着看一下客户端的打印:

  1. Now is:BAD ORDER
  2. BAD ORDER
  3. ; the counter is:1

因为服务端只收到了2条消息,因此客户端也只会收到2条消息,因为服务端两次收到的内容都不满足”QUERY TIME ORDER”,因此返回”BAD ORDER”到客户端,但是为什么客户端的counter=1呢?回过头来仔细想想,因此服务端发送给客户端的消息也发生了粘包。因此这里简单得出一个结论:粘包/拆包不仅仅发生在客户端给服务端发送数据,服务端回数据给客户端同样有可能发生粘包/拆包

上面的例子演示了粘包,拆包其实一样的,既然可以知道服务端每收到一个包的大小为1024字节,那客户端每次发送一个大于1024字节的数据给服务端就可以了,有兴趣的朋友可以自己尝试一下。

利用LineBasedFrameDecoder解决粘包问题

为了解决TCP粘包/拆包导致的半包读写问题,Netty默认提供了多种编解码器用于处理半包,针对上面发送“QUERY TIME ORDER”+标准换行符的这种场景,简单使用LineBasedFrameDecoder就可以解决上面发生的粘包问题。

首先对TimeServer进行改造,加入LineBasedFrameDecoder与StringDecoder:

  1. 1 public class TimeServer {
  2. 2
  3. 3 public void bind(int port) throws Exception {
  4. 4 // NIO线程组
  5. 5 EventLoopGroup bossGroup = new NioEventLoopGroup();
  6. 6 EventLoopGroup workerGroup = new NioEventLoopGroup();
  7. 7
  8. 8 try {
  9. 9 ServerBootstrap b = new ServerBootstrap();
  10. 10 b.group(bossGroup, workerGroup)
  11. 11 .channel(NioServerSocketChannel.class)
  12. 12 .option(ChannelOption.SO_BACKLOG, 1024)
  13. 13 .childHandler(new ChildChannelHandler());
  14. 14
  15. 15 // 绑定端口,同步等待成功
  16. 16 ChannelFuture f = b.bind(port).sync();
  17. 17 // 等待服务端监听端口关闭
  18. 18 f.channel().closeFuture().sync();
  19. 19 } finally {
  20. 20 // 优雅退出,释放线程池资源
  21. 21 bossGroup.shutdownGracefully();
  22. 22 workerGroup.shutdownGracefully();
  23. 23 }
  24. 24 }
  25. 25
  26. 26 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
  27. 27 @Override
  28. 28 protected void initChannel(SocketChannel arg0) throws Exception {
  29. 29 arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
  30. 30 arg0.pipeline().addLast(new StringDecoder());
  31. 31 arg0.pipeline().addLast(new TimeServerHandler());
  32. 32 }
  33. 33 }
  34. 34
  35. 35 }

改造点就在29行、30行两行,加入了LineBasedFrameDecoder与StringDecoder,同时TimeServerHandler也需要相应改造:

  1. 1 public class TimeServerHandler extends ChannelHandlerAdapter {
  2. 2
  3. 3 private int counter;
  4. 4
  5. 5 @Override
  6. 6 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  7. 7 String body = (String)msg;
  8. 8 System.out.println("The time server receive order:" + body + "; the counter is:" + ++counter);
  9. 9
  10. 10 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
  11. 11 currentTime = currentTime + System.getProperty("line.separator");
  12. 12
  13. 13 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
  14. 14 ctx.writeAndFlush(resp);
  15. 15 }
  16. 16
  17. 17 @Override
  18. 18 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  19. 19 ctx.close();
  20. 20 }
  21. 21
  22. 22 }

改造点在第7行,由于使用了StringDecoder,因此channelRead的第二个参数msg不再是ByteBuf类型而是String类型,因此这里只需要做一次String强转即可。

TimeClient改造类似:

  1. 1 public class TimeClient {
  2. 2
  3. 3 public void connect(int port, String host) throws Exception {
  4. 4 EventLoopGroup group = new NioEventLoopGroup();
  5. 5 try {
  6. 6 Bootstrap b = new Bootstrap();
  7. 7
  8. 8 b.group(group)
  9. 9 .channel(NioSocketChannel.class)
  10. 10 .option(ChannelOption.TCP_NODELAY, true)
  11. 11 .handler(new ChannelInitializer<SocketChannel>() {
  12. 12 protected void initChannel(SocketChannel ch) throws Exception {
  13. 13 ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
  14. 14 ch.pipeline().addLast(new StringDecoder());
  15. 15 ch.pipeline().addLast(new TimeClientHandler());
  16. 16 };
  17. 17 });
  18. 18
  19. 19 // 发起异步连接操作
  20. 20 ChannelFuture f = b.connect(host, port).sync();
  21. 21 // 等待客户端连接关闭
  22. 22 f.channel().closeFuture().sync();
  23. 23 } finally {
  24. 24 // 优雅退出,释放NIO线程组
  25. 25 group.shutdownGracefully();
  26. 26 }
  27. 27 }
  28. 28
  29. 29 }

第13行、第14行这两行加入了LineBasedFrameDecoder与StringDecoder,TimeClientHandler相应改造:

  1. 1 public class TimeClientHandler extends ChannelHandlerAdapter {
  2. 2
  3. 3 private static final Logger LOGGER = LoggerFactory.getLogger(TimeClientHandler.class);
  4. 4
  5. 5 private int counter;
  6. 6
  7. 7 private byte[] req;
  8. 8
  9. 9 public TimeClientHandler() {
  10. 10 req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
  11. 11 }
  12. 12
  13. 13 @Override
  14. 14 public void channelActive(ChannelHandlerContext ctx) throws Exception {
  15. 15 ByteBuf message = null;
  16. 16 for (int i = 0; i < 100; i++) {
  17. 17 message = Unpooled.buffer(req.length);
  18. 18 message.writeBytes(req);
  19. 19 ctx.writeAndFlush(message);
  20. 20 }
  21. 21 }
  22. 22
  23. 23 @Override
  24. 24 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  25. 25 String body = (String)msg;
  26. 26 System.out.println("Now is:" + body + "; the counter is:" + ++counter);
  27. 27 }
  28. 28
  29. 29 @Override
  30. 30 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  31. 31 LOGGER.warn("Unexcepted exception from downstream:" + cause.getMessage());
  32. 32 ctx.close();
  33. 33 }
  34. 34
  35. 35 }

第25行这里使用String进行强转即可。接下来看一下服务端的打印:

  1. The time server receive order:QUERY TIME ORDER; the counter is:1
  2. The time server receive order:QUERY TIME ORDER; the counter is:2
  3. The time server receive order:QUERY TIME ORDER; the counter is:3
  4. The time server receive order:QUERY TIME ORDER; the counter is:4
  5. The time server receive order:QUERY TIME ORDER; the counter is:5
  6. ...
  7. The time server receive order:QUERY TIME ORDER; the counter is:98
  8. The time server receive order:QUERY TIME ORDER; the counter is:99
  9. The time server receive order:QUERY TIME ORDER; the counter is:100

看到服务端正常counter从1打印到了100,即收到了100个完整的客户端请求,客户端的打印如下:

  1. Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:1
  2. Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:2
  3. Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:3
  4. Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:4
  5. Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:5
  6. ...
  7. Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:98
  8. Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:99
  9. Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:100

看到同样的客户端也正常counter从1打印到了100,即收到了100个完整的服务端响应,至此,使用LineBasedFrameDecoder与StringDecoder解决了上述粘包问题。

整个LineBasedFrameDecoder的原理也比较简单:

  1. LineBasedFrameDecoder依次遍历ByteBuf中的可读字节,判断是否有"\n"或者"\r\n",如果有就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行,它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度,如果连续读到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。
  2. StringDecoder的功能非常简单,就是将接收到的对象转换为字符串,然后继续调用后面的Handler
  3. LineBasedFrameDecoder+StringDecoder就是按行切换的文本解码器,被设计用于支持TCP的粘包和拆包

原文地址http://www.cnblogs.com/xrq730/p/5260294.html,转载请注明出处


另外补充两个自己项目种实际使用到的示例;

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzMyNDQ3MzIx_size_16_color_FFFFFF_t_70

  1. 文中GBDelimiterMsgDecoder:是自定义的一个类,后续把放到githup上再加地址

发表评论

表情:
评论列表 (有 0 条评论,84人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Netty

    拆包 拆包是指netty传输数据时候,会把一条信息,拆成几部分。比如:"hello "拆成"he" 和 "llo"等, 粘包 粘包是指将多条信息连在一起 比如发送