netty编解码器与序列化框架分析

分手后的思念是犯贱 2022-12-20 03:30 213阅读 0赞

netty编解码器分析

编码(Encode)也称为序列化(serialization),它将对象序列化为字节数组,用于网络传输、数据持久化或者其它用途。

反之,解码(Decode)也称为反序列化(deserialization),用于把从网络、磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷贝),以方便后续的业务逻辑操作。

进行远程跨进程服务调用时(例如 RPC 调用),需要使用特定的编解码技术,对需要进行网络传输的对象做编码或者解码,以便完成远程调用。

Java序列化

Java默认提供的序列化机制,需要序列化的Java对象只需要实现java.io.Serializable接口并生成序列化ID,这个类就能够通过java.io.ObjectInputStream和java.io.ObjectOutputStrem序列化和反序列化。

但是由于它自身存在很多缺点,因此大多数的RPC框架并没有选择它。Java序列化的主要缺点如下:

  1. 无法跨语言:是Java序列化最致命的问题。对于跨进程的服务调用,服务提供者可能会使用C++或者其它语言开发,当我们需要和异构语言进程交互时,Java序列化就难以胜任。由于Java序列化技术是Java语言内部的私有协议,其它语言并不支持,对于用户来说它完全是黑盒。Java序列化后的字节数组,别的语言无法进行反序列化,这就严重阻碍了它的应用范围。
  2. 序列化后的码流太大,例如使用二进制编解码技术对同一个复杂的POJO对象进行编码,它的码流仅仅为Java序列化之后的20%左右;目前主流的编解码框架,序列化之后的码流都远远小于原生的Java序列化。
  3. 序列化性能太低。

在netty中使用java的序列化,只需要添加编码器ObjectEncoder和解码器ObjectDecoder即可,netty都封装好了。

服务器端代码:

  1. ServerBootstrap b = new ServerBootstrap();
  2. b.group(bossGroup, workerGroup)
  3. .channel(NioServerSocketChannel.class)
  4. .childHandler(new ChannelInitializer<SocketChannel>() {
  5. @Override
  6. public void initChannel(SocketChannel ch) throws Exception {
  7. ch.pipeline().addLast(new ObjectEncoder()); // 编码
  8. ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); // 解码
  9. ch.pipeline().addLast(new ServerHandler());
  10. }
  11. });

服务器端业务代码:

  1. public class ServerHandler extends SimpleChannelInboundHandler {
  2. @Override
  3. protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
  4. System.out.println("receive from client: " + msg);
  5. UserResponse response = new UserResponse();
  6. response.setCode(200);
  7. response.setMessage("success");
  8. ctx.writeAndFlush(response);
  9. }
  10. }

思考:不需要解决粘包半包问题?ObjectEncoder和ObjectDecoder中已经对粘包半包问题进行了处理,使用的是自定义消息长度的方式,下面看源代码。

io.netty.handler.codec.serialization.ObjectEncoder的源代码:

  1. public class ObjectEncoder extends MessageToByteEncoder<Serializable> {
  2. private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
  3. @Override
  4. protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
  5. int startIdx = out.writerIndex();
  6. ByteBufOutputStream bout = new ByteBufOutputStream(out);
  7. ObjectOutputStream oout = null;
  8. try {
  9. bout.write(LENGTH_PLACEHOLDER); // 先占位4个字节
  10. oout = new CompactObjectOutputStream(bout);
  11. oout.writeObject(msg);
  12. oout.flush();
  13. } finally {
  14. if (oout != null) {
  15. oout.close();
  16. } else {
  17. bout.close();
  18. }
  19. }
  20. int endIdx = out.writerIndex();
  21. out.setInt(startIdx, endIdx - startIdx - 4); // 计算报文的长度,放在开始的4个字节里面
  22. }
  23. }

io.netty.handler.codec.serialization.ObjectDecoder解码器的源代码:

  1. public class ObjectDecoder extends LengthFieldBasedFrameDecoder {
  2. ... ...
  3. public ObjectDecoder(int maxObjectSize, ClassResolver classResolver) {
  4. super(maxObjectSize, 0, 4, 0, 4);
  5. this.classResolver = classResolver;
  6. }
  7. ... ...
  8. }

发现ObjectDecoder继承LengthFieldBasedFrameDecoder,构造方法中调用了父类的构造方法super(maxObjectSize, 0, 4, 0, 4);,指定了解码时前4个字节为报文的长度,解码后丢弃前面的4个字节,传递给后面的Handler。

Google的Protobuf

Protobuf全称Google Protocol Buffers,它由谷歌开源而来,在谷歌内部久经考验。它将数据结构以.proto 文件进行描述,通过代码生成工具可以生成对应数据结构的POJO对象和Protobuf相关的方法和属性。

它的特点如下:

  1. 结构化数据存储格式(XML,JSON等)。
  2. 高效的编解码性能。
  3. 语言无关、平台无关、扩展性好。

使用方法如下:

先下载对应操作系统的工具,下载地址:https://github.com/protocolbuffers/protobuf/releases,这里使用的是`protoc-3.13.0-win64.zip`,解压缩后bin目录下有个protoc.exe。

编写UserResponse.proto文件:

  1. syntax = "proto3";
  2. option java_package = "com.morris.netty.serialize.protobuf";
  3. option java_outer_classname = "UserResponseProto";
  4. message UserResponse {
  5. int32 code = 1;
  6. string message = 2;
  7. }

然后使用protoc.exe生成对应的实体类:

  1. > protoc.exe --java_out=. UserResponse.proto

把生成的实体类,拷贝到工程对应的目录。

项目中引入protobuf的maven依赖:

  1. dependency>
  2. <groupId>com.google.protobuf</groupId>
  3. <artifactId>protobuf-java</artifactId>
  4. <version>3.6.1</version>
  5. </dependency>

服务器端代码:

  1. ServerBootstrap b = new ServerBootstrap();
  2. b.group(bossGroup, workerGroup)
  3. .channel(NioServerSocketChannel.class)
  4. .childHandler(new ChannelInitializer<SocketChannel>() {
  5. @Override
  6. public void initChannel(SocketChannel ch) throws Exception {
  7. ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); // 处理粘包半包的解码器
  8. ch.pipeline().addLast(new ProtobufDecoder(UserRequestProto.UserRequest.getDefaultInstance())); // protobuf解码器
  9. ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); // 处理粘包半包的编码器
  10. ch.pipeline().addLast(new ProtobufEncoder()); // protobuf编码器
  11. ch.pipeline().addLast(new ServerHandler()); // 具体业务处理
  12. }
  13. });

服务器端业务代码:

  1. public class ServerHandler extends SimpleChannelInboundHandler {
  2. @Override
  3. protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
  4. System.out.println("receive from client: " + msg);
  5. UserResponseProto.UserResponse response = UserResponseProto.UserResponse.newBuilder().setCode(200).setMessage("success").buildPartial();
  6. ctx.writeAndFlush(response);
  7. }
  8. }

JBoss Marshalling

JBoss Marshalling是一个Java对象的序列化API包,修正了JDK自带的序列化包的很多问题,但又保持跟java.io.Serializable接口的兼容;同时增加了一些可调的参数和附加的特性,并且这些参数和特性可通过工厂类进行配置。

相比于传统的Java序列化机制,它的优点如下:

  1. 可插拔的类解析器,提供更加便捷的类加载定制策略,通过一个接口即可实现定制;
  2. 可插拔的对象替换技术,不需要通过继承的方式;
  3. 可插拔的预定义类缓存表,可以减小序列化的字节数组长度,提升常用类型的对象序列化性能;
  4. 无须实现java.io.Serializable接口,即可实现Java序列化;
  5. 通过缓存技术提升对象的序列化性能。

使用方法如下:

引入marshalling的maven依赖:

  1. <dependency>
  2. <groupId>org.jboss.marshalling</groupId>
  3. <artifactId>jboss-marshalling-serial</artifactId>
  4. <version>2.0.6.Final</version>
  5. </dependency>

创建一个生产marshalling编码器和marshalling解码器的工厂类:

  1. package com.morris.netty.serialize.marshalling;
  2. import io.netty.handler.codec.marshalling.*;
  3. import org.jboss.marshalling.MarshallerFactory;
  4. import org.jboss.marshalling.Marshalling;
  5. import org.jboss.marshalling.MarshallingConfiguration;
  6. public final class MarshallingCodeCFactory {
  7. public static MarshallingDecoder buildMarshallingDecoder() {
  8. final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
  9. final MarshallingConfiguration configuration = new MarshallingConfiguration();
  10. configuration.setVersion(5);
  11. UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
  12. MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
  13. return decoder;
  14. }
  15. public static MarshallingEncoder buildMarshallingEncoder() {
  16. final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
  17. final MarshallingConfiguration configuration = new MarshallingConfiguration();
  18. configuration.setVersion(5);
  19. MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
  20. MarshallingEncoder encoder = new MarshallingEncoder(provider);
  21. return encoder;
  22. }
  23. }

服务器端代码:

  1. ServerBootstrap b = new ServerBootstrap();
  2. b.group(bossGroup, workerGroup)
  3. .channel(NioServerSocketChannel.class)
  4. .childHandler(new ChannelInitializer<SocketChannel>() {
  5. @Override
  6. public void initChannel(SocketChannel ch) throws Exception {
  7. ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); // 解码器
  8. ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); // 编码器
  9. ch.pipeline().addLast(new ServerHandler());
  10. }
  11. });

这里不需要处理粘包和半包问题的原因与java序列化不需要处理的原因一致,具体可以参考MarshallingEncoder和MarshallingDecoder的源代码。

服务器端业务代码与java序列化的服务器端业务代码一致。

相比于前面介绍的两种编解码框架,JBoss Marshalling更多是在JBoss内部使用,应用范围有限,netty是JBoss公司的,所以netty要支持一下自己家的产品。

自定义编解码器MessagePack

编码器相关基类:

  • 将消息编码为字节:MessageToByteEncoder
  • 将消息编码为消息:MessageToMessageEncoder,T代表源数据的类型。

解码器相关基类:

  • 将字节解码为消息:ByteToMessageDecoder。
  • 将一种消息类型解码为另一种消息类型:MessageToMessageDecoder。

编解码器类:

  • ByteToMessageCodec。
  • MessageToMessageCodec。

下面通过自定义编码器和解码器,将MessagePack作为序列化框架嵌入到项目中来:

引入MessagePack的maven依赖:

  1. <dependency>
  2. <groupId>org.msgpack</groupId>
  3. <artifactId>msgpack</artifactId>
  4. <version>0.6.12</version>
  5. </dependency>

编码器:

  1. package com.morris.netty.serialize.messagepack;
  2. import com.morris.netty.serialize.pojo.UserRequest;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.handler.codec.MessageToByteEncoder;
  6. import org.msgpack.MessagePack;
  7. public class MessagePackEncoder extends MessageToByteEncoder<UserRequest> {
  8. @Override
  9. protected void encode(ChannelHandlerContext ctx, UserRequest msg, ByteBuf out)
  10. throws Exception {
  11. MessagePack messagePack = new MessagePack();
  12. byte[] raw = messagePack.write(msg);
  13. out.writeBytes(raw);
  14. }
  15. }

解码器:

  1. package com.morris.netty.serialize.messagepack;
  2. import com.morris.netty.serialize.pojo.UserRequest;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.handler.codec.MessageToMessageDecoder;
  6. import org.msgpack.MessagePack;
  7. import java.io.Serializable;
  8. import java.util.List;
  9. public class MessagePackDecoder extends MessageToMessageDecoder<ByteBuf> {
  10. @Override
  11. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
  12. throws Exception {
  13. byte[] bytes = new byte[in.readableBytes()];
  14. in.readBytes(bytes);
  15. MessagePack messagePack = new MessagePack();
  16. Serializable serializable = messagePack.read(bytes, UserRequest.class);
  17. out.add(serializable);
  18. }
  19. }

实体类,注意加上@Message注解:

  1. package com.morris.netty.serialize.pojo;
  2. import lombok.Data;
  3. import org.msgpack.annotation.Message;
  4. import java.io.Serializable;
  5. @Message
  6. @Data
  7. public class UserRequest implements Serializable {
  8. private int age;
  9. private String name;
  10. }

服务器端启动类关键代码:

  1. ServerBootstrap b = new ServerBootstrap();
  2. b.group(bossGroup, workerGroup)
  3. .channel(NioServerSocketChannel.class)
  4. .childHandler(new ChannelInitializer<SocketChannel>() {
  5. @Override
  6. public void initChannel(SocketChannel ch) throws Exception {
  7. ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2)); // 解决粘包半包问题
  8. ch.pipeline().addLast(new MessagePackDecoder()); // 解码器
  9. ch.pipeline().addLast(new ServerHandler());
  10. }
  11. });

服务器端业务处理类:

  1. package com.morris.netty.serialize.messagepack;
  2. import io.netty.buffer.Unpooled;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.SimpleChannelInboundHandler;
  5. public class ServerHandler extends SimpleChannelInboundHandler {
  6. @Override
  7. protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
  8. System.out.println("receive from client: " + msg);
  9. ctx.writeAndFlush(Unpooled.copiedBuffer("success\n".getBytes()));
  10. ctx.fireChannelRead(msg);
  11. }
  12. }

客户端启动类关键代码:

  1. Bootstrap b = new Bootstrap();
  2. b.group(workerGroup)
  3. .channel(NioSocketChannel.class)
  4. .handler(new ChannelInitializer<SocketChannel>() {
  5. @Override
  6. public void initChannel(SocketChannel ch) throws Exception {
  7. ch.pipeline().addLast(new LengthFieldPrepender(2)); // 解决粘包半包问题
  8. // 服务器端发送过来的是以换行符分割的文本,所以这里使用LineBasedFrameDecoder处理粘包半包问题
  9. ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
  10. ch.pipeline().addLast(new MessagePackEncoder()); // 编码器
  11. ch.pipeline().addLast(new ClientHandler());
  12. }
  13. });

客户端业务处理类:

  1. package com.morris.netty.serialize.messagepack;
  2. import com.morris.netty.serialize.pojo.UserRequest;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.SimpleChannelInboundHandler;
  6. public class ClientHandler extends SimpleChannelInboundHandler {
  7. @Override
  8. public void channelActive(ChannelHandlerContext ctx) {
  9. for (int i = 0; i < 100; i++) {
  10. UserRequest request = new UserRequest();
  11. request.setAge(i);
  12. request.setName("morris");
  13. ctx.writeAndFlush(request);
  14. }
  15. }
  16. @Override
  17. protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
  18. ByteBuf receiveByteBuf = (ByteBuf) msg;
  19. byte[] bytes = new byte[receiveByteBuf.readableBytes()];
  20. receiveByteBuf.readBytes(bytes);
  21. System.out.println("receive from server: " + new String(bytes));
  22. }
  23. }

当然,除了上述介绍的编解码框架和技术之外,比较常用的还有kryo、hession和Json等。

发表评论

表情:
评论列表 (有 0 条评论,213人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Netty 解码器

    Netty 编解码 在网络中数据是以字节码二进制的形式传输的,所以我们在使用 Netty 传输数据的时候,需要将我们传输的数据转换为 二进制的形式 进行传输,所以不管是我

    相关 Netty解码器框架(十一)

    今天分享Netty编解码器框架 一、什么是编解码器 每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何 将其和目标应用程序的数据格式做相互转换。

    相关 Netty(解码器框架)

        每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和目标应用程序的数据格式做相互转换。这种转换逻辑由编解码器处理,编解码器  由编码器和解

    相关 Netty——解码器

    什么是编解码器 每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和 目标应用程序的数据格式做相互转换。这种转换逻辑由编解码器处理,编解码器