Netty中序列化框架MessagePack的简单实现

ゝ一纸荒年。 2022-02-20 06:15 291阅读 0赞

  MessagePack是一个高效的二进制序列化框架,它像JSON一样支持不同语言间的数据交换,但是它的性能更快,序列化之后的码流也更小。MessagePack的特点如下:

  1. 编解码高效,性能高;
  2. 序列化之后码流小
  3. 支持跨语言

MessagePack使用

1.依赖

  使用maven构建项目

  1. <dependency>
  2. <groupId>org.msgpack</groupId>
  3. <artifactId>msgpack</artifactId>
  4. <version>0.6.12</version>
  5. </dependency>

2.创建编码和解码器

编码器

  1. /** * @param ctx 上下文 * @param msg 需要编码的对象 * @param out 编码后的数据 */
  2. @Override
  3. protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
  4. MessagePack msgpack = new MessagePack();
  5. // 对对象进行序列化
  6. byte[] raw = msgpack.write(msg);
  7. // 返回序列化的数据
  8. out.writeBytes(raw);
  9. }

解码器

  1. /** * @param ctx 上下文 * @param msg 需要解码的数据 * @param out 解码列表 */
  2. @Override
  3. protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
  4. final byte[] array;
  5. final int length = msg.readableBytes();
  6. array = new byte[length];
  7. // 获取需要解码的字节数组
  8. msg.getBytes(msg.readerIndex(), array,0,length);
  9. MessagePack msgpack = new MessagePack();
  10. // 反序列化并将结果保存到了解码列表中
  11. out.add(msgpack.read(array));
  12. }

3.客户端

EchoClient

  1. /** * MsgPack 编解码器 * * @author 波波烤鸭 * @email dengpbs@163.com * */
  2. public class EchoClient {
  3. public static void main(String[] args) throws Exception {
  4. int port = 8080;
  5. if (args != null && args.length > 0) {
  6. try {
  7. port = Integer.valueOf(args[0]);
  8. } catch (NumberFormatException e) {
  9. // 采用默认值
  10. }
  11. }
  12. new EchoClient().connector(port, "127.0.0.1",10);
  13. }
  14. public void connector(int port, String host,final int sendNumber) throws Exception {
  15. // 配置客户端NIO线程组
  16. EventLoopGroup group = new NioEventLoopGroup();
  17. try {
  18. Bootstrap b = new Bootstrap();
  19. b.group(group).channel(NioSocketChannel.class)
  20. .option(ChannelOption.TCP_NODELAY, true)
  21. .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
  22. .handler(new ChannelInitializer<SocketChannel>() {
  23. @Override
  24. protected void initChannel(SocketChannel ch) throws Exception {
  25. //这里设置通过增加包头表示报文长度来避免粘包
  26. ch.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(1024, 0, 2,0,2));
  27. //增加解码器
  28. ch.pipeline().addLast("msgpack decoder",new MsgpackDecoder());
  29. //这里设置读取报文的包头长度来避免粘包
  30. ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));
  31. //增加编码器
  32. ch.pipeline().addLast("msgpack encoder",new MsgpackEncoder());
  33. // 4.添加自定义的处理器
  34. ch.pipeline().addLast(new EchoClientHandler(sendNumber));
  35. }
  36. });
  37. // 发起异步连接操作
  38. ChannelFuture f = b.connect(host, port).sync();
  39. // 等待客户端链路关闭
  40. f.channel().closeFuture().sync();
  41. }catch(Exception e){
  42. e.printStackTrace();
  43. } finally {
  44. // 优雅退出,释放NIO线程组
  45. group.shutdownGracefully();
  46. }
  47. }
  48. }

EchoClientHandler

  1. /** * DelimiterBasedFrameDecoder 案例 * 自定义处理器 * @author 波波烤鸭 * @email dengpbs@163.com * */
  2. public class EchoServerHandler extends ChannelHandlerAdapter{
  3. @Override
  4. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  5. //UserInfo user = (UserInfo) msg;
  6. System.out.println("server receive the msgpack message :"+msg);
  7. //ctx.writeAndFlush(user);
  8. ctx.writeAndFlush(msg);
  9. }
  10. @Override
  11. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  12. cause.printStackTrace();
  13. ctx.close(); // 发生异常关闭链路
  14. }
  15. }

4.服务端

EchoServer

  1. /** * MsgPack 编解码器 * 服务端 * @author 波波烤鸭 * @email dengpbs@163.com * */
  2. public class EchoServer {
  3. public void bind(int port) throws Exception {
  4. // 配置服务端的NIO线程组
  5. // 服务端接受客户端的连接
  6. NioEventLoopGroup bossGroup = new NioEventLoopGroup();
  7. // 进行SocketChannel的网络读写
  8. NioEventLoopGroup workerGroup = new NioEventLoopGroup();
  9. try {
  10. ServerBootstrap b = new ServerBootstrap();
  11. b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
  12. .option(ChannelOption.SO_BACKLOG, 100)
  13. .handler(new LoggingHandler(LogLevel.INFO))
  14. .childHandler(new ChannelInitializer<SocketChannel>() {
  15. @Override
  16. protected void initChannel(SocketChannel ch) throws Exception {
  17. ch.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65535, 0, 2,0,2));
  18. // 添加msgpack的编码和解码器
  19. ch.pipeline().addLast("msgpack decoder",new MsgpackDecoder());
  20. ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));
  21. ch.pipeline().addLast("msgpack encoder",new MsgpackEncoder());
  22. // 添加自定义的处理器
  23. ch.pipeline().addLast(new EchoServerHandler());
  24. }
  25. });
  26. // 绑定端口,同步等待成功
  27. ChannelFuture f = b.bind(port).sync();
  28. // 等待服务端监听端口关闭
  29. f.channel().closeFuture().sync();
  30. }catch(Exception e){
  31. e.printStackTrace();
  32. } finally {
  33. // 优雅退出,释放线程池资源
  34. bossGroup.shutdownGracefully();
  35. workerGroup.shutdownGracefully();
  36. }
  37. }
  38. public static void main(String[] args) throws Exception {
  39. int port = 8080;
  40. if(args!=null && args.length > 0){
  41. try{
  42. port = Integer.valueOf(args[0]);
  43. }catch(NumberFormatException e){
  44. // 采用默认值
  45. }
  46. }
  47. new EchoServer().bind(port);
  48. }
  49. }

EchoServerHandler

  1. /** * DelimiterBasedFrameDecoder 案例 * 自定义处理器 * @author 波波烤鸭 * @email dengpbs@163.com * */
  2. public class EchoServerHandler extends ChannelHandlerAdapter{
  3. @Override
  4. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  5. //UserInfo user = (UserInfo) msg;
  6. System.out.println("server receive the msgpack message :"+msg);
  7. //ctx.writeAndFlush(user);
  8. ctx.writeAndFlush(msg);
  9. }
  10. @Override
  11. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  12. cause.printStackTrace();
  13. ctx.close(); // 发生异常关闭链路
  14. }
  15. }

5.注意点(POJO)

  消息类上加上注解Message,还有就是必须要有默认的无参构造器

  1. /** * Msgpack 中必须添加@Message注解 及 无参构造方法 * @author 波波烤鸭 * @email dengpbs@163.com * */
  2. @Message
  3. public class UserInfo {
  4. private String name;
  5. private int age;
  6. public String getName() {
  7. return name;
  8. }
  9. public void setName(String name) {
  10. this.name = name;
  11. }
  12. public int getAge() {
  13. return age;
  14. }
  15. public void setAge(int age) {
  16. this.age = age;
  17. }
  18. @Override
  19. public String toString() {
  20. return "UserInfo [name=" + name + ", age=" + age + "]";
  21. }
  22. }

6.测试

服务端输出

  1. server receive the msgpack message :["bobo烤鸭:0",0]
  2. server receive the msgpack message :["bobo烤鸭:1",1]
  3. server receive the msgpack message :["bobo烤鸭:2",2]
  4. server receive the msgpack message :["bobo烤鸭:3",3]
  5. server receive the msgpack message :["bobo烤鸭:4",4]
  6. server receive the msgpack message :["bobo烤鸭:5",5]
  7. server receive the msgpack message :["bobo烤鸭:6",6]
  8. server receive the msgpack message :["bobo烤鸭:7",7]
  9. server receive the msgpack message :["bobo烤鸭:8",8]
  10. server receive the msgpack message :["bobo烤鸭:9",9]

客户端输出

  1. Client receive the msgpack message :["bobo烤鸭:0",0]
  2. Client receive the msgpack message :["bobo烤鸭:1",1]
  3. Client receive the msgpack message :["bobo烤鸭:2",2]
  4. Client receive the msgpack message :["bobo烤鸭:3",3]
  5. Client receive the msgpack message :["bobo烤鸭:4",4]
  6. Client receive the msgpack message :["bobo烤鸭:5",5]
  7. Client receive the msgpack message :["bobo烤鸭:6",6]
  8. Client receive the msgpack message :["bobo烤鸭:7",7]
  9. Client receive the msgpack message :["bobo烤鸭:8",8]
  10. Client receive the msgpack message :["bobo烤鸭:9",9]

至此Netty中就可以通过MessagePack来处理序列化的情况了~

发表评论

表情:
评论列表 (有 0 条评论,291人围观)

还没有评论,来说两句吧...

相关阅读

    相关 基于Netty实现简单RPC框架

            Dubbo采用Netty作为基础通信组件,模仿Dubbo实现简单版的RPC框架。服务消费者和服务提供者约定接口和协议,用于远程调用和TCP请求验证。服务提供者作