初识Netty三(基于MessagePack实现编解码)

迷南。 2023-02-14 01:47 121阅读 0赞

MessagePack介绍

目前主要的编解码:

  1. Java序列化
  2. Marshalig
  3. XML
  4. JSON
  5. MessagePack
  6. Protobuf

本次编解码是使用 MessagePack实现的,MessagePack主要优点:

  • 高效
  • 跨语言
    官网解释:
    在这里插入图片描述
    更多介绍请自行查看官网

MessagePack 使用

引入依赖

  1. <dependency>
  2. <groupId>org.msgpack</groupId>
  3. <artifactId>msgpack</artifactId>
  4. <version>0.6.12</version>
  5. </dependency>

MessagePack 编解码使用起来非常方便,像这样

  1. @org.junit.Test
  2. public void test() throws Exception{
  3. List<String> list = Lists.newArrayList();
  4. list.add("msgpack");
  5. list.add("kumofs");
  6. list.add("viver");
  7. MessagePack msgpack = new MessagePack();
  8. // Serialize
  9. byte[] raw = msgpack.write(list);
  10. // Deserialize directly using a template(直接使用模板反序列化)
  11. List<String> list1 = msgpack.read(raw, Templates.tList(Templates.TString));
  12. list.forEach(s -> System.out.println(s));
  13. }

基于 MessagePack 编解码器实现

编码器

  1. public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf> {
  2. @Override
  3. protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
  4. final byte[] array;
  5. final int length = byteBuf.readableBytes();
  6. array = new byte[length];
  7. // 获取数据包 byteBuf中的byte数组
  8. byteBuf.getBytes(byteBuf.readerIndex(), array, 0, length);
  9. MessagePack msgpack = new MessagePack();
  10. //调用 msgpack.read 反序列化然后加入到list中
  11. list.add(msgpack.read(array));
  12. }
  13. }

解码器

  1. /** * @author WH * @version 1.0 * @date 2020/5/28 23:49 * @Description MsgPackEncoder 继承 MessageToByteEncoder * 它负责将Object类型的POJO对象编码为byte数组,然后写入到ByteBuf中 */
  2. public class MsgPackEncoder extends MessageToByteEncoder<Object> {
  3. @Override
  4. protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
  5. MessagePack msgpack = new MessagePack();
  6. //Serialize
  7. byte[] raw = msgpack.write(o);
  8. byteBuf.writeBytes(raw);
  9. }
  10. }

客户端实现

EchoClient

  1. public class EchoClient {
  2. public static void main(String[] args) {
  3. EventLoopGroup group = new NioEventLoopGroup();
  4. try {
  5. Bootstrap b = new Bootstrap();
  6. b.group(group).channel(NioSocketChannel.class)
  7. .option(ChannelOption.TCP_NODELAY, true)
  8. .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
  9. .handler(new ChannelInitializer<SocketChannel>() {
  10. @Override
  11. public void initChannel(SocketChannel ch) throws Exception {
  12. ch.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65535,0,
  13. 2,0,2));
  14. ch.pipeline().addLast("msgpack decoder", new MsgPackDecoder());
  15. ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));
  16. ch.pipeline().addLast("msgpack encoder", new MsgPackEncoder());
  17. ch.pipeline().addLast(new EchoClientHandler(10));
  18. }
  19. });
  20. //发起异步连接操作
  21. ChannelFuture f = b.connect("127.0.0.1", 8080).sync();
  22. //等待客户端链路关闭
  23. f.channel().closeFuture().sync();
  24. } catch (Exception e) {
  25. e.printStackTrace();
  26. } finally {
  27. //优雅退出,释放NIO线程租
  28. group.shutdownGracefully();
  29. }
  30. }
  31. }

EchoClientHandler

  1. @Slf4j
  2. public class EchoClientHandler extends ChannelHandlerAdapter {
  3. private final int sendNumber;
  4. private static List<User> users;
  5. public EchoClientHandler(int sendNumber) {
  6. this.sendNumber = sendNumber;
  7. users = new ArrayList<>(sendNumber);
  8. for (int i = 0; i < sendNumber; i++) {
  9. String name = "阿离" + i;
  10. User user = new User(name, i);
  11. users.add(user);
  12. }
  13. }
  14. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  15. for (User user : users) {
  16. ctx.write(user);
  17. log.info("客户端发送消息为" + user);
  18. ctx.writeAndFlush(user);
  19. }
  20. }
  21. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  22. log.info("接收到服务器的消息: {}", msg);
  23. }
  24. @Override
  25. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  26. ctx.flush();
  27. }
  28. @Override
  29. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  30. cause.printStackTrace();
  31. ctx.close();
  32. }
  33. }

服务端实现

EchoServer

  1. public class EchoServer {
  2. public static void main(String[] args) {
  3. //配置服务端的NIO线程租
  4. EventLoopGroup bossGroup = new NioEventLoopGroup();
  5. EventLoopGroup workerGroup = new NioEventLoopGroup();
  6. try {
  7. ServerBootstrap b = new ServerBootstrap();
  8. b.group(bossGroup, workerGroup)
  9. .channel(NioServerSocketChannel.class)
  10. .option(ChannelOption.SO_BACKLOG, 100)
  11. .handler(new LoggingHandler(LogLevel.INFO))
  12. .childHandler(new ChannelInitializer<SocketChannel>() {
  13. @Override
  14. public void initChannel(SocketChannel ch) throws Exception{
  15. ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,0,
  16. 2,0,2));
  17. ch.pipeline().addLast(new MsgPackDecoder());
  18. ch.pipeline().addLast(new LengthFieldPrepender(2));
  19. ch.pipeline().addLast(new MsgPackEncoder());
  20. ch.pipeline().addLast(new EchoServerHandler());
  21. }
  22. });
  23. //绑定端口,成功同步等待
  24. ChannelFuture f = b.bind(8080).sync();
  25. //订单服务端监听端口关闭
  26. f.channel().closeFuture().sync();
  27. } catch (Exception e) {
  28. e.printStackTrace();
  29. } finally {
  30. //优雅退出,释放线程池资源
  31. bossGroup.shutdownGracefully();
  32. workerGroup.shutdownGracefully();
  33. }
  34. }
  35. }

EchoServerHandler

  1. @Slf4j
  2. public class EchoServerHandler extends ChannelHandlerAdapter {
  3. @Override
  4. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  5. ctx.writeAndFlush(msg);
  6. log.info("服务端接受到的消息为:{}", msg);
  7. }
  8. }

测试

在这里插入图片描述
在这里插入图片描述

源码下载

https://github.com/weihubeats/netty-student

发表评论

表情:
评论列表 (有 0 条评论,121人围观)

还没有评论,来说两句吧...

相关阅读