Netty解决TCP粘包/拆包的问题

叁歲伎倆 2022-02-20 00:48 501阅读 0赞

什么是TCP粘包/拆包

  首先要明确, 粘包问题中的 “包”, 是指应用层的数据包.在TCP的协议头中, 没有如同UDP一样的 “报文长度” 字段,但是有一个序号字段.
  站在传输层的角度, TCP是一个一个报文传过来的. 按照序号排好序放在缓冲区中.
  站在应用层的角度, 看到的只是一串连续的字节数据.那么应用程序看到了这一连串的字节数据, 就不知道从哪个部分开始到哪个部分是一个完整的应用层数据包.此时数据之间就没有了边界, 就产生了粘包问题,那么如何避免粘包问题呢?归根结底就是一句话, 明确两个包之间的边界

在这里插入图片描述

  如图所示,假设客户端分别发送两个数据包D1和D2给服务器端,由于服务器端一次读取到的字节数是不确定的,所以可能存在以下几种情况:

  1. 服务端分两次读取到了两个独立的数据包,分别是D1和D2,这种情况没有粘包和拆包
  2. 服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包
  3. 服务端分两次读取到了两个数据包,第一次读取到了完整的D2包和D1包的部分内容,第二次读取到了D1包剩余的内容,这被称为TCP拆包
  4. 和第3中情况相反,也是拆包
  5. 如果服务端的TCP接收滑窗非常小,而数据包D1和D2比较大,那么服务器要分多次才能将D1和D2完全接收完,期间发生了多次拆包

未考虑TCP粘包案例

  上面我们介绍了TCP粘包和拆包的原因,现在我们通过Netty案例来实现下不考虑TCP粘包和拆包问题而造成的影响。代码如下

服务端代码

  服务端每读取到一条消息,就计数一次,然后发送应答消息给客户端,按照设计,服务端接受到的消息总数应该跟客户端发送消息总数相同,而且请求消息删除回车换行符后应该为”Query time order”.

  1. package com.dpb.netty.demo1;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.ChannelOption;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioServerSocketChannel;
  9. import io.netty.handler.codec.LineBasedFrameDecoder;
  10. import io.netty.handler.codec.string.StringDecoder;
  11. public class TimerServer {
  12. public void bind(int port) throws Exception{
  13. // 配置服务端的NIO线程组
  14. // 服务端接受客户端的连接
  15. NioEventLoopGroup bossGroup = new NioEventLoopGroup();
  16. // 进行SocketChannel的网络读写
  17. NioEventLoopGroup workerGroup = new NioEventLoopGroup();
  18. try {
  19. // 用于启动NIO服务端的辅助启动类,目的是降低服务端的开发复杂度
  20. ServerBootstrap b = new ServerBootstrap();
  21. b.group(bossGroup,workerGroup)
  22. // 设置Channel
  23. .channel(NioServerSocketChannel.class)
  24. // 设置TCP参数
  25. .option(ChannelOption.SO_BACKLOG, 1024)
  26. // 绑定I/O事件的处理类ChildChannelHandler
  27. .childHandler(new ChildChannelHandler());
  28. // 绑定端口,同步等待成功
  29. ChannelFuture f = b.bind(port).sync();
  30. // 等待服务端监听端口关闭
  31. f.channel().closeFuture().sync();
  32. } finally {
  33. // 优雅退出,释放线程池资源
  34. bossGroup.shutdownGracefully();
  35. workerGroup.shutdownGracefully();
  36. }
  37. }
  38. private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
  39. @Override
  40. protected void initChannel(SocketChannel arg0) throws Exception {
  41. arg0.pipeline().addLast(new TimerServerHandler());
  42. }
  43. }
  44. public static void main(String[] args) throws Exception {
  45. int port = 8080;
  46. new TimerServer().bind(port);
  47. }
  48. }
  49. package com.dpb.netty.demo1;
  50. import java.nio.ByteBuffer;
  51. import io.netty.buffer.ByteBuf;
  52. import io.netty.buffer.Unpooled;
  53. import io.netty.channel.ChannelHandlerAdapter;
  54. import io.netty.channel.ChannelHandlerContext;
  55. /** * 用于对网络事件进行读写操作 * @author 波波烤鸭 * @email dengpbs@163.com * */
  56. public class TimerServerHandler extends ChannelHandlerAdapter{
  57. private int counter;
  58. @Override
  59. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  60. // ByteBuf 类似于NIO中的java.nio.ByteBuffer对象,
  61. ByteBuf buf = (ByteBuf) msg;
  62. byte[] req = new byte[buf.readableBytes()];
  63. buf.readBytes(req);
  64. String body = new String(req,"utf-8")
  65. .substring(0,req.length-System.getProperty("line.separator").length());
  66. System.out.println("The time server receive order : "+body+",counter is:"+ ++counter);
  67. String currentTime = "Query time order".equalsIgnoreCase(body)?new java.util.Date(System.currentTimeMillis()).toString():"BAD ORDER";
  68. ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
  69. ctx.writeAndFlush(resp);
  70. }
  71. @Override
  72. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  73. // TODO Auto-generated method stub
  74. ctx.flush();
  75. }
  76. @Override
  77. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  78. // TODO Auto-generated method stub
  79. ctx.close();
  80. }
  81. }

客户端代码

  客户端跟服务器连接建立成功后,循环发送100条消息,每发送一条就刷新一次,保证每条消息都会被写入Channel中,按照设计,服务端应该接收到100条查询时间指令的请求消息,客户端每接收到服务端一条应答消息后,就打印一次计数器,按照设计客户端应该打印100次服务端的系统时间。

  1. package com.dpb.netty.demo1;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.ChannelOption;
  6. import io.netty.channel.EventLoopGroup;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.socket.SocketChannel;
  9. import io.netty.channel.socket.nio.NioSocketChannel;
  10. public class TimeClient {
  11. public static void main(String[] args) throws Exception {
  12. int port = 8080;
  13. new TimeClient().connect(port, "127.0.0.1");
  14. }
  15. public void connect(int port,String host)throws Exception{
  16. // 配置客户端NIO线程组
  17. EventLoopGroup group = new NioEventLoopGroup();
  18. try{
  19. Bootstrap b = new Bootstrap();
  20. b.group(group).channel(NioSocketChannel.class)
  21. .option(ChannelOption.TCP_NODELAY, true)
  22. .handler(new ChannelInitializer<SocketChannel>() {
  23. @Override
  24. protected void initChannel(SocketChannel arg0) throws Exception {
  25. arg0.pipeline().addLast(new TimeClientHandler());
  26. }
  27. });
  28. // 发起异步连接操作
  29. ChannelFuture f = b.connect(host,port).sync();
  30. // 等待客户端链路关闭
  31. f.channel().closeFuture().sync();
  32. }catch(Exception e){
  33. e.printStackTrace();
  34. }finally{
  35. // 优雅退出,释放NIO线程组
  36. group.shutdownGracefully();
  37. }
  38. }
  39. }
  40. package com.dpb.netty.demo1;
  41. import java.util.logging.Logger;
  42. import io.netty.buffer.ByteBuf;
  43. import io.netty.buffer.Unpooled;
  44. import io.netty.channel.ChannelHandlerAdapter;
  45. import io.netty.channel.ChannelHandlerContext;
  46. public class TimeClientHandler extends ChannelHandlerAdapter{
  47. private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());
  48. private int counter;
  49. private byte[] req;
  50. public TimeClientHandler(){
  51. // 客户端每次发送的消息后面都跟了一个换行符
  52. req = ("Query time order"+System.getProperty("line.separator")).getBytes();
  53. }
  54. @Override
  55. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  56. ByteBuf message = null;
  57. for(int i =0;i < 100 ; i++){
  58. message = Unpooled.buffer(req.length);
  59. message.writeBytes(req);
  60. ctx.writeAndFlush(message);
  61. }
  62. }
  63. @Override
  64. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  65. ByteBuf buf = (ByteBuf) msg;
  66. byte[] req = new byte[buf.readableBytes()];
  67. buf.readBytes(req);
  68. String body = new String(req,"UTF-8");
  69. System.out.println("Now is :"+body+";counter is :"+ ++counter);
  70. }
  71. @Override
  72. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  73. // TODO Auto-generated method stub
  74. logger.warning("Unexpected exception from downstream:"+cause.getMessage());
  75. ctx.close();
  76. }
  77. }

测试结果

服务端输出结果:

  1. The time server receive order : Query time order
  2. Query time order
  3. ... 省略55
  4. Query time ord,counter is:1
  5. The time server receive order :
  6. Query time order
  7. ... 省略41
  8. Query time order,counter is:2

客户端输出结果

  1. Now is :BAD ORDERBAD BAD ORDERBAD ;counter is :1

  服务端的运行结果表明它只接受到了两条消息,第一条包含57个”Query time order”指令,第二条包含43条”Query time order”指令,总数刚好是100条,我们期待服务器会接收到100次,结果只接受到了两次,说明发送了TCP粘包。而客户端设计应该受到100条响应,实际服务器发送了两次响应,客户端只受到了一条响应,说明服务器返回给客户端的应答信息也发生了粘包问题。

Netty解决TCP粘包

  为了解决TCP粘包/拆包导致的半包读写问题,Netty默认提供了多种编解码器用于处理半包,此处我们使用LineBasedFrameDecoder来解决,实现如下

服务端修改

在这里插入图片描述

在这里插入图片描述

客户端修改

在这里插入图片描述
在这里插入图片描述

测试结果

服务端输出

在这里插入图片描述

客户端输出

在这里插入图片描述

  程序的运行结果完全符合我们的预期,说明通过LineBasedFrameDecoder和StringDecoder成功解决了TCP粘包导致的读半包问题,对于使用者来说,只要将支持半包解码的Handler添加到ChannelPiPeline中即可,不需要额外的代码,用户使用起来非常简单。

LineBasedFrameDecoder和StringDecoder的原理

  的工作原理是依次遍历ByteBuf中的可读字节,判断看是否有”\n”或者”\r\n”,如果有就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行,它是以换行符为结束标志的解码器,StringDecoder的功能非常简单,就是将接收到的对象转换成字符串,然后继续调用后面的Handler, LineBasedFrameDecoder + StringDecoder组合就是按行切换的文本解码器,它被设计用来支持TCP的粘包和拆包问题。

在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,501人围观)

还没有评论,来说两句吧...

相关阅读

    相关 NettyTCP问题

           粘包拆包问题是处于网络比较底层的问题,在数据链路层、网络层以及传输层都有可能发生。我们日常的网络应用开发大都在传输层进行,由于UDP有消息保护边界,不会发生这个问