Netty使用Protobuf编解码

「爱情、让人受尽委屈。」 2022-05-13 10:04 435阅读 0赞

Protobuf是一个灵活、高效、结构化的数据序列化框架,相比于XML等传统的序列化工具,它更小、更快、更简单。Protobuf支持数据结构化一次可以到处使用,甚至跨语言使用,通过代码生成工具可以自动生成不同语言版本的源代码,甚至可以在使用不同版本的数据结构进程间进行数据传递,实现数据结构的前向兼容。

下面结合一个例子看看在Netty如何使用Protobuf对POJO对象进行编解码。

1.Protobuf环境搭建

从https://developers.google.com/protocol-buffers/docs/downloads下载Protobuf,本例中是3.0.0版本。

下载后对压缩包进行解压,在/bin目录可以看到protoc.exe。protoc.exe根据.proto文件生成代码。下面根据图书订购程序定义SubcribeReq.proto和SubcribeResp.proto文件。
SubcribeReq.proto文件内容:

  1. syntax = "proto3";
  2. package netty;
  3. option java_package = "com.tommy.netty.protobuf";
  4. option java_outer_classname = "SubcribeReqProto";
  5. message SubcribeReq { int32 subReqID = 1; string userName = 2; string productName = 3; repeated string address = 4; }

SubcribeResp.proto文件内容:

  1. syntax = "proto3";
  2. package netty;
  3. option java_package = "com.tommy.netty.protobuf";
  4. option java_outer_classname = "SubcribeRespProto";
  5. message SubcribeResp {
  6. int32 subReqID = 1;
  7. int32 respCode = 2;
  8. string desc = 3;
  9. }

通过protoc.exe生成代码。
进入到protoc.exe目录,分别执行:
protoc.exe --java_out=.\src .\netty\SubcribeReq.protoprotoc.exe --java_out=.\src .\netty\SubcribeResp.proto
在/bin目录的src目录中可以看到生成的文件。(注意:要提前创建好src目录)
49128754.jpg
97870455.jpg

将生成的SubcribeReqProto.java和SubcribeRespProto.java复制到工程中。

在工程中导入protobuf3.0.0的包:

  1. <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
  2. <dependency>
  3. <groupId>com.google.protobuf</groupId>
  4. <artifactId>protobuf-java</artifactId>
  5. <version>3.0.0</version>
  6. </dependency>

2.Protobuf编解码开发

  1. /** * @author j.tommy * @version 1.0 * @date 2017/12/10 */
  2. public class TestSubcribeReqProto {
  3. private static byte[] encode(SubcribeReqProto.SubcribeReq subcribeReq) {
  4. return subcribeReq.toByteArray();
  5. }
  6. private static SubcribeReqProto.SubcribeReq decode(byte[] body) throws InvalidProtocolBufferException {
  7. return SubcribeReqProto.SubcribeReq.parseFrom(body);
  8. }
  9. private static SubcribeReqProto.SubcribeReq createSubcribeReq() {
  10. SubcribeReqProto.SubcribeReq.Builder builder = SubcribeReqProto.SubcribeReq.newBuilder();
  11. builder.setSubReqID(1);
  12. builder.setUserName("j.tommy");
  13. builder.setProductName("Netty权威指南");
  14. List<String> addressList = new ArrayList<String>();
  15. addressList.add("北京市");
  16. addressList.add("上海市");
  17. addressList.add("西安市");
  18. addressList.add("深圳市");
  19. builder.addAllAddress(addressList);
  20. return builder.build();
  21. }
  22. public static void main(String[] args) throws InvalidProtocolBufferException {
  23. SubcribeReqProto.SubcribeReq req = createSubcribeReq();
  24. System.out.println("Before encode:" + req);
  25. SubcribeReqProto.SubcribeReq req2 = decode(encode(req));
  26. System.out.println("After decode:" + req2);
  27. System.out.println("Assert equals:" + req.equals(req2));
  28. }
  29. }

通过SubcribeReq的.newBuilder()创建Builder,通过Builder设置SubscribeReq的属性,最后通过builder.builder()方法生成对象。

编码时通过SubscribeReq的toByteArray()即可将SubcribeReq编码为直接数组。
解码时通过SubscribeReq的parseForm将二进制数组编码为SubscribeReq对象。

运行结果:
82964556.jpg

3.Netty的Protobuf订购程序开发

服务端代码:

  1. /** * @author j.tommy * @version 1.0 * @date 2017/12/10 */
  2. public class SubcribeServer {
  3. public static void main(String[] args) {
  4. EventLoopGroup bossGroup = new NioEventLoopGroup();
  5. EventLoopGroup workerGroup = new NioEventLoopGroup();
  6. ServerBootstrap s = new ServerBootstrap();
  7. s.group(bossGroup,workerGroup)
  8. .channel(NioServerSocketChannel.class)
  9. .handler(new LoggingHandler(LogLevel.INFO))
  10. .option(ChannelOption.SO_BACKLOG, 100)
  11. .childHandler(new ChannelInitializer<SocketChannel>() {
  12. @Override
  13. protected void initChannel(SocketChannel sc) throws Exception {
  14. sc.pipeline().addLast(new ProtobufVarint32FrameDecoder());
  15. sc.pipeline().addLast(new ProtobufDecoder(SubcribeReqProto.SubcribeReq.getDefaultInstance()));
  16. sc.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
  17. sc.pipeline().addLast(new ProtobufEncoder());
  18. sc.pipeline().addLast(new SubcribeServerHandler());
  19. }
  20. });
  21. try {
  22. ChannelFuture cf = s.bind(9989).sync();
  23. cf.channel().closeFuture().sync();
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. } finally {
  27. bossGroup.shutdownGracefully();
  28. workerGroup.shutdownGracefully();
  29. }
  30. }
  31. }
  32. class SubcribeServerHandler extends ChannelHandlerAdapter {
  33. @Override
  34. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  35. ctx.close();
  36. }
  37. private SubcribeRespProto.SubcribeResp createSubcribeResp(int subReqID) {
  38. SubcribeRespProto.SubcribeResp.Builder builder = SubcribeRespProto.SubcribeResp.newBuilder();
  39. builder.setSubReqID(subReqID);
  40. builder.setRespCode(0);
  41. builder.setDesc("Order success.");
  42. return builder.build();
  43. }
  44. @Override
  45. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  46. SubcribeReqProto.SubcribeReq req = (SubcribeReqProto.SubcribeReq) msg;
  47. System.out.println("接收到客户端请求:" + req.getSubReqID() + ",userName:" + req.getUserName() + ",productName:" + req.getProductName());
  48. SubcribeRespProto.SubcribeResp resp = createSubcribeResp(req.getSubReqID());
  49. ctx.writeAndFlush(resp);
  50. }
  51. }

客户端代码:

  1. /** * @author j.tommy * @version 1.0 * @date 2017/12/10 */
  2. public class SubcribleClient {
  3. public static void main(String[] args) {
  4. EventLoopGroup group = new NioEventLoopGroup();
  5. Bootstrap b = new Bootstrap();
  6. b.group(group)
  7. .channel(NioSocketChannel.class)
  8. .option(ChannelOption.TCP_NODELAY, true)
  9. .handler(new ChannelInitializer<SocketChannel>() {
  10. @Override
  11. protected void initChannel(SocketChannel sc) throws Exception {
  12. sc.pipeline().addLast(new ProtobufVarint32FrameDecoder());
  13. sc.pipeline().addLast(new ProtobufDecoder(SubcribeRespProto.SubcribeResp.getDefaultInstance()));
  14. sc.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
  15. sc.pipeline().addLast(new ProtobufEncoder());
  16. sc.pipeline().addLast(new SubcribleClientHandler());
  17. }
  18. });
  19. try {
  20. ChannelFuture f = b.connect("127.0.0.1", 9989).sync();
  21. f.channel().closeFuture().sync();
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. } finally {
  25. group.shutdownGracefully();
  26. }
  27. }
  28. }
  29. class SubcribleClientHandler extends ChannelHandlerAdapter {
  30. @Override
  31. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  32. cause.printStackTrace();
  33. ctx.close();
  34. }
  35. @Override
  36. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  37. SubcribeReqProto.SubcribeReq req = null;
  38. for (int i=0;i<10;i++) {
  39. req = createSubcribeReq(i);
  40. ctx.write(req);
  41. }
  42. ctx.flush();
  43. }
  44. private SubcribeReqProto.SubcribeReq createSubcribeReq(int subSeqID) {
  45. SubcribeReqProto.SubcribeReq.Builder builder = SubcribeReqProto.SubcribeReq.newBuilder();
  46. builder.setSubReqID(subSeqID);
  47. builder.setUserName("j.tommy");
  48. builder.setProductName("netty权威指南");
  49. List<String> addressList = new ArrayList<String>();
  50. addressList.add("北京市");
  51. addressList.add("西安市");
  52. addressList.add("深圳市");
  53. return builder.build();
  54. }
  55. @Override
  56. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  57. SubcribeRespProto.SubcribeResp resp = (SubcribeRespProto.SubcribeResp) msg;
  58. System.out.println("接收到服务端响应:" + resp.getSubReqID() + ",responseCode:" + resp.getRespCode() + ",desc:" + resp.getDesc());
  59. }
  60. }

ProtobufVarint32FrameDecoder,它主要用来处理半包;

ProtobufDecoder解压器,它的参数是com.google.protobuf.MessageLite,实际上是告诉ProtobufDecoder需要解码的目标类是什么,否则仅仅从字节数组是无法知道要解码的目标类型信息的。

服务端中ProtobufEncoder用于对响应的SubcribeResp进行编码。

运行结果:
服务端:
42748982.jpg
客户端:
56553006.jpg

4.Protobuf的使用注意事项

ProtobufDecoder仅仅负责解码,它不支持读半包。因此在ProtobufDecoder的前面,一定要有能够处理半包消息的解码器。
有3种方式可以选择:
1.使用Netty提供的ProtobufVarint32FrameDecoder,它可以处理半包消息;
2.继承Netty提供的通用半包解码器LengthFieldBasedFrameDecoder;
3.继承ByteToMessageDecoder类,自己处理半包消息。

如果只使用ProtobufDecoder解码器,而忽略对半包消息的处理,程序没法正常工作。

参考《Netty权威指南》

发表评论

表情:
评论列表 (有 0 条评论,435人围观)

还没有评论,来说两句吧...

相关阅读

    相关 ProtoBuf 解码

    在网络程序开发过程中,数据在网络中是以二进制字节码的格式传输的,在发送业务数据前,先对业务数据进行编码,经过网络传输,收到数据后对数据进行解码,Netty提供了一些编解码器,它