Netty编解码(MessagePack实现)

我会带着你远行 2022-05-14 14:12 459阅读 0赞

MessagePack是一个高效的二进制序列化框架,它像JSON一样支持不同语言间的数据交换,但是它性能更快,序列化后码流更小。

MessagePack特点:

1.编解码高效,性能高;

2.序列化后的码流小;

3.支持跨语言。

实现:

首先需要的jar:

  1. <dependency>
  2. <groupId>io.netty</groupId>
  3. <artifactId>netty-all</artifactId>
  4. <version>4.1.30.Final</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.msgpack</groupId>
  8. <artifactId>msgpack</artifactId>
  9. <version>0.6.12</version>
  10. </dependency>

解码器:

  1. import io.netty.buffer.ByteBuf;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.handler.codec.MessageToMessageDecoder;
  4. import org.msgpack.MessagePack;
  5. import java.util.List;
  6. public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf> {
  7. @Override
  8. protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
  9. // 获取要解码的byte数组
  10. final byte[] bytes;
  11. final int length = byteBuf.readableBytes();
  12. bytes = new byte[length];
  13. byteBuf.getBytes(byteBuf.readerIndex(),bytes,0,length);
  14. // 调用MessagePack 的read方法将其反序列化为Object对象
  15. MessagePack msgPack = new MessagePack();
  16. list.add(msgPack.read(bytes));
  17. }
  18. }

编码器:

  1. import io.netty.buffer.ByteBuf;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.handler.codec.MessageToByteEncoder;
  4. import org.msgpack.MessagePack;
  5. public class MsgPackEncoder extends MessageToByteEncoder<Object> {
  6. @Override
  7. protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
  8. MessagePack msgPack = new MessagePack();
  9. // 编码,然后转为ButyBuf传递
  10. byte[] bytes = msgPack.write(o);
  11. byteBuf.writeBytes(bytes);
  12. }
  13. }

这里传输User类,User类代码:

  1. import org.msgpack.annotation.Message;
  2. @Message
  3. public class User {
  4. private String name;
  5. private int age;
  6. private String id;
  7. private String sex;
  8. public int getAge() {
  9. return age;
  10. }
  11. public void setAge(int age) {
  12. this.age = age;
  13. }
  14. public String getId() {
  15. return id;
  16. }
  17. public void setId(String id) {
  18. this.id = id;
  19. }
  20. public String getSex() {
  21. return sex;
  22. }
  23. public void setSex(String sex) {
  24. this.sex = sex;
  25. }
  26. public String getName() {
  27. return name;
  28. }
  29. public void setName(String name) {
  30. this.name = name;
  31. }
  32. @Override
  33. public String toString() {
  34. return "User{" +
  35. "name='" + name + '\'' +
  36. ", age=" + age +
  37. ", id='" + id + '\'' +
  38. ", sex='" + sex + '\'' +
  39. '}';
  40. }
  41. }

服务端代码:

  1. import io.netty.bootstrap.ServerBootstrap;
  2. import io.netty.channel.ChannelFuture;
  3. import io.netty.channel.ChannelInitializer;
  4. import io.netty.channel.ChannelOption;
  5. import io.netty.channel.EventLoopGroup;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioServerSocketChannel;
  9. import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
  10. import io.netty.handler.codec.LengthFieldPrepender;
  11. import io.netty.handler.logging.LogLevel;
  12. import io.netty.handler.logging.LoggingHandler;
  13. public class EchoServer {
  14. public void bind(int port) throws Exception {
  15. EventLoopGroup bossGroup = new NioEventLoopGroup();
  16. EventLoopGroup workerGroup = new NioEventLoopGroup();
  17. try{
  18. ServerBootstrap b = new ServerBootstrap();
  19. b.group(bossGroup, workerGroup)
  20. .channel(NioServerSocketChannel.class)
  21. .option(ChannelOption.SO_BACKLOG, 100)
  22. .handler(new LoggingHandler(LogLevel.INFO))
  23. .childHandler(new ChannelInitializer<SocketChannel>() {
  24. @Override
  25. protected void initChannel(SocketChannel ch) throws Exception {
  26. // TODO Auto-generated method stub
  27. ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,0,
  28. 2,0,2));
  29. ch.pipeline().addLast(new MsgPackDecoder());
  30. ch.pipeline().addLast(new LengthFieldPrepender(2));
  31. ch.pipeline().addLast(new MsgPackEncoder());
  32. ch.pipeline().addLast(new EchoServerHandler());
  33. }
  34. });
  35. //bind port
  36. ChannelFuture f = b.bind(port).sync();
  37. //wait
  38. f.channel().closeFuture().sync();
  39. }finally{
  40. bossGroup.shutdownGracefully();
  41. workerGroup.shutdownGracefully();
  42. }
  43. }
  44. /**
  45. * @param args
  46. */
  47. public static void main(String[] args) {
  48. int port = 8080;
  49. try {
  50. new EchoServer().bind(port);
  51. } catch (Exception e) {
  52. // TODO Auto-generated catch block
  53. e.printStackTrace();
  54. }
  55. }
  56. }
  57. import io.netty.channel.ChannelHandlerContext;
  58. import io.netty.channel.ChannelInboundHandlerAdapter;
  59. public class EchoServerHandler extends ChannelInboundHandlerAdapter {
  60. @Override
  61. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  62. System.out.println("服务器接收msgpack消息 : "+msg+"");
  63. // 原路返回给客户端
  64. ctx.writeAndFlush(msg);
  65. }
  66. @Override
  67. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  68. ctx.flush();
  69. }
  70. @Override
  71. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  72. ctx.close();
  73. }
  74. }

客户端代码:

  1. import io.netty.bootstrap.Bootstrap;
  2. import io.netty.channel.ChannelFuture;
  3. import io.netty.channel.ChannelInitializer;
  4. import io.netty.channel.ChannelOption;
  5. import io.netty.channel.nio.NioEventLoopGroup;
  6. import io.netty.channel.socket.SocketChannel;
  7. import io.netty.channel.socket.nio.NioSocketChannel;
  8. import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
  9. import io.netty.handler.codec.LengthFieldPrepender;
  10. public class EchoClient {
  11. public void connection(int port,String host) throws InterruptedException {
  12. NioEventLoopGroup workGroup = new NioEventLoopGroup();
  13. try {
  14. Bootstrap b = new Bootstrap();
  15. b.group(workGroup)
  16. .channel(NioSocketChannel.class)
  17. .option(ChannelOption.TCP_NODELAY,true)
  18. .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,3000)
  19. .handler(new ChannelInitializer<SocketChannel>() {
  20. @Override
  21. protected void initChannel(SocketChannel socketChannel) throws Exception {
  22. socketChannel.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65535,0,
  23. 2,0,2));
  24. socketChannel.pipeline().addLast("msgpack译码器",new MsgPackDecoder());
  25. socketChannel.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));
  26. socketChannel.pipeline().addLast("msgpack编码器",new MsgPackEncoder());
  27. socketChannel.pipeline().addLast(new EchoClientHandler());
  28. //
  29. }
  30. });
  31. // 发起异步连接操作
  32. ChannelFuture f = b.connect(host,port).sync();
  33. // 等待客户端链路关闭
  34. f.channel().closeFuture().sync();
  35. } finally {
  36. workGroup.shutdownGracefully();
  37. }
  38. }
  39. public static void main(String[] args) throws InterruptedException {
  40. int port = 8080;
  41. new EchoClient().connection(port,"127.0.0.1");
  42. }
  43. }
  44. import io.netty.channel.ChannelHandlerContext;
  45. import io.netty.channel.ChannelInboundHandlerAdapter;
  46. public class EchoClientHandler extends ChannelInboundHandlerAdapter {
  47. private int count;
  48. @Override
  49. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  50. /* User user = getUser();
  51. ctx.writeAndFlush(user);*/
  52. User[] users = getUsers();
  53. for (User u : users) {
  54. System.out.println(u);
  55. ctx.writeAndFlush(u);
  56. }
  57. }
  58. @Override
  59. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  60. System.out.println("这是客户端接收的消息【 " + ++count + " 】时间:【" + msg + "】");
  61. }
  62. @Override
  63. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  64. ctx.flush();
  65. }
  66. @Override
  67. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  68. ctx.close();
  69. }
  70. private User[] getUsers() {
  71. User[] users = new User[5];
  72. for (int i = 0; i < 5; i++) {
  73. User user = new User();
  74. user.setId(String.valueOf(i));
  75. user.setAge(18 + i);
  76. user.setName("迪迦" + i);
  77. user.setSex("男" + String.valueOf(i * 2));
  78. users[i] = user;
  79. }
  80. return users;
  81. }
  82. private User getUser() {
  83. User user = new User();
  84. user.setId("11");
  85. user.setAge(18);
  86. user.setName("张元");
  87. user.setSex("男");
  88. return user;
  89. }
  90. }

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM3NTk4MDEx_size_27_color_FFFFFF_t_70

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM3NTk4MDEx_size_27_color_FFFFFF_t_70 1

发表评论

表情:
评论列表 (有 0 条评论,459人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Netty解码及protostuff

    Netty编解码是什么? 要想了解编解码,首先要知客户端和服务端是怎么处理信息的,当通过Netty发送或者接收信息的时候,首先要将接收到的消息尽心decode()方法进行