Netty协议之 Protocol Buffers (三)

待我称王封你为后i 2023-02-21 11:41 115阅读 0赞

前言

本章将会介绍如何使用Netty搭建一个支持Protocol Buffers协议的服务器,提供支持多数据/消息结构体的解析方法。

Protocol Buffers 协议

Protocol Buffers 是一种轻便高效的结构化数据存储格式,可以用于结构化数据序列化,很适合做数据存储或 RPC 数据交换格式。它可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。

PB文件的生成

编写PB文件

首先我们先定义好传输协议的结构。

NettyProtobuf.proto

  1. syntax = "proto3";
  2. //生成java类的package包名
  3. option java_package = "com.chenws.netty.protobuf.proto";
  4. //生成java类的类名
  5. option java_outer_classname="NettyProtobuf";
  6. //定义一个people
  7. message People {
  8. //姓名
  9. string name = 1;
  10. //年龄
  11. int32 age = 2;
  12. //性别
  13. string gender = 3;
  14. //地址
  15. string address = 4;
  16. }

我们使用Protocol Buffers 提供的一个exe文件来生成PB格式的java类。
可以在windows cmd上执行命令

  1. protoc --java_out=生成类文件名录 pb文件名(这里为NettyProtobuf.proto
  2. ## 例如:
  3. protoc --java_out=./ ./NettyProtobuf.proto

exe文件可见:protoc.exe
这样我们就生成了一个类名为 NettyProtobuf.java 的java类。

Netty 支持

导入依赖包

  1. // gradle
  2. compile group: 'io.netty', name: 'netty-all', version: '4.1.50.Final'
  3. // maven
  4. <dependency>
  5. <groupId>io.netty</groupId>
  6. <artifactId>netty-all</artifactId>
  7. <version>4.1.50.Final</version>
  8. </dependency>

Server 服务端

编写启动类

与之前TCP文章一样,不同的地方只是针对channel的初始化。

  1. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  2. EventLoopGroup workerGroup = new NioEventLoopGroup(6);
  3. try {
  4. ServerBootstrap serverBootstrap = new ServerBootstrap();
  5. serverBootstrap.group(bossGroup,workerGroup)
  6. .channel(NioServerSocketChannel.class)
  7. .option(ChannelOption.SO_BACKLOG, 1024)
  8. .childOption(ChannelOption.SO_KEEPALIVE, true)
  9. .childHandler(new ChannelInitializer<SocketChannel>() {
  10. @Override
  11. protected void initChannel(SocketChannel socketChannel) throws Exception {
  12. //添加自定义处理器
  13. socketChannel.pipeline()
  14. .addLast(new ProtobufVarint32FrameDecoder())
  15. .addLast(new ProtobufDecoder(NettyProtobuf.People.getDefaultInstance()))
  16. .addLast(new ProtobufVarint32LengthFieldPrepender())
  17. .addLast(new ProtobufEncoder())
  18. .addLast(new PBNettyHandler());
  19. }
  20. });
  21. int port = 8082;
  22. //监听器,当服务绑定成功后执行
  23. ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
  24. //监听器,当停止服务后执行。
  25. channelFuture.channel().closeFuture().sync();
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. } finally {
  29. bossGroup.shutdownGracefully();
  30. workerGroup.shutdownGracefully();
  31. }
  • bossGroup 里面初始化了一条线程(这里就是NioEventLoop,包含一个NIO SELECTOR,一个队列,一条线程),其作用就是负责处理所有客户端连接的请求,创建新连接(channel),然后注册到workerGroup 的其中一个NioEventLoop。之后该channel的所有读/写操作都由与Channel绑定的NioEventLoop来处理。
  • workerGroup 如上所说,主要是负责处理读/写操作,这里初始化了6条线程(NioEventLoop)去执行,不设置的话默认会根据CPU核数得到一个合适的数目。
  • Channel的整个生命周期都由同一个NioEventLoop处理。
  • new ProtobufDecoder(NettyProtobuf.People.getDefaultInstance()) 这里指定了解码器的类型为People(上面用exe生成的java类),因此这样只能解码一种类型的数据。本文章会提供支持多种数据结构体类型的方法,请往下看。

handler 处理器

  1. public class PBNettyHandler extends SimpleChannelInboundHandler<NettyProtobuf.People> {
  2. @Override
  3. protected void channelRead0(ChannelHandlerContext ctx, NettyProtobuf.People msg) throws Exception {
  4. System.out.println(msg);
  5. //把消息写到缓冲区
  6. ctx.write(msg);
  7. }
  8. @Override
  9. public void channelReadComplete(ChannelHandlerContext ctx) {
  10. //刷新缓冲区,把消息发出去
  11. ctx.flush();
  12. }
  13. @Override
  14. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  15. //异常处理,如果该handler不处理,将会传递到下一个handler
  16. cause.printStackTrace();
  17. }
  18. @Override
  19. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  20. ctx.writeAndFlush("connect success!");
  21. }
  22. }

到此服务器已经完成。

Client 客户端

启动类

跟服务端基本一样,这里直接贴上代码。

  1. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  2. try {
  3. Bootstrap bootstrap = new Bootstrap();
  4. bootstrap.group(bossGroup)
  5. .channel(NioSocketChannel.class)
  6. .handler(new ChannelInitializer<SocketChannel>() {
  7. @Override
  8. protected void initChannel(SocketChannel socketChannel) throws Exception {
  9. //添加自定义处理器
  10. socketChannel.pipeline()
  11. .addLast(new ProtobufVarint32FrameDecoder())
  12. .addLast(new ProtobufDecoder(NettyProtobuf.People.getDefaultInstance()))
  13. .addLast(new ProtobufVarint32LengthFieldPrepender())
  14. .addLast(new ProtobufEncoder())
  15. .addLast(new PBClientHandler());
  16. }
  17. });
  18. ChannelFuture channelFuture = bootstrap.connect("localhost",8082).sync();
  19. //监听器,当停止服务后执行。
  20. channelFuture.channel().closeFuture().sync();
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. } finally {
  24. bossGroup.shutdownGracefully();
  25. }

handler处理器

  1. public class PBClientHandler extends SimpleChannelInboundHandler<NettyProtobuf.People> {
  2. @Override
  3. protected void channelRead0(ChannelHandlerContext ctx, NettyProtobuf.People msg) throws Exception {
  4. System.out.println("客户端收到信息:" + msg.getAddress());
  5. }
  6. @Override
  7. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  8. NettyProtobuf.People build = NettyProtobuf.People.newBuilder().setAddress("广州市").setAge(50).setGender("男").setName("小白").build();
  9. ctx.writeAndFlush(build);
  10. }
  11. }

处理器可见当与服务器创建通道后,就会往服务端发送一条数据。

  1. NettyProtobuf.People build = NettyProtobuf.People.newBuilder().setAddress("广州市").setAge(50).setGender("男").setName("小白").build();

服务端收到后,会打印消息并且原样返回给客户端。

到此,Netty对PB的支持已经介绍完了,但我们希望服务端能提高兼容性,可以兼容多种数据结构。如果使用以上的方法,则需要不同的结构体需要跑多个server,这显然不实际。下面一起看看多数据结构传输的用法。

支持多结构体协议

编写PB文件

  1. syntax = "proto3";
  2. option java_package = "com.chenws.netty.protobuf.proto";
  3. option java_outer_classname="NettyMulProtobuf";
  4. message MultipleMessage {
  5. enum DataType {
  6. StudentType = 0;
  7. SchoolType = 1;
  8. }
  9. DataType data_type = 1;
  10. oneof dataBody {
  11. Student student = 2;
  12. School school = 3;
  13. }
  14. }
  15. message Student {
  16. string name = 1;
  17. int32 age = 2;
  18. string address = 3;
  19. }
  20. message School {
  21. string school_name = 1;
  22. string school_address = 2;
  23. }
  • 定义了一个枚举 DataType,用来判断使用的是哪种数据结构体
  • oneof 的作用是指定dataBody中的字段最多只有一个能被赋值。
  • MultipleMessage 由枚举DataType以及dataBody组成。
  • Student和School为具体的数据结构,为dataBody的组成部分

一样,生成它对应的java文件。

Server服务端

  1. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  2. EventLoopGroup workerGroup = new NioEventLoopGroup(6);
  3. try {
  4. ServerBootstrap serverBootstrap = new ServerBootstrap();
  5. serverBootstrap.group(bossGroup,workerGroup)
  6. .channel(NioServerSocketChannel.class)
  7. .option(ChannelOption.SO_BACKLOG, 1024)
  8. .childOption(ChannelOption.SO_KEEPALIVE, true)
  9. .childHandler(new ChannelInitializer<SocketChannel>() {
  10. @Override
  11. protected void initChannel(SocketChannel socketChannel) throws Exception {
  12. //添加自定义处理器
  13. socketChannel.pipeline()
  14. .addLast(new ProtobufVarint32FrameDecoder())
  15. .addLast(new ProtobufDecoder(NettyMulProtobuf.MultipleMessage.getDefaultInstance()))
  16. .addLast(new ProtobufVarint32LengthFieldPrepender())
  17. .addLast(new ProtobufEncoder())
  18. .addLast(new PBMulNettyHandler());
  19. }
  20. });
  21. int port = 8082;
  22. //监听器,当服务绑定成功后执行
  23. ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
  24. //监听器,当停止服务后执行。
  25. channelFuture.channel().closeFuture().sync();
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. } finally {
  29. bossGroup.shutdownGracefully();
  30. workerGroup.shutdownGracefully();
  31. }

注意new ProtobufDecoder(NettyMulProtobuf.MultipleMessage.getDefaultInstance()),指定了数据解码类型为MultipleMessage。

Server handler

  1. public class PBMulNettyHandler extends SimpleChannelInboundHandler<NettyMulProtobuf.MultipleMessage> {
  2. @Override
  3. protected void channelRead0(ChannelHandlerContext ctx, NettyMulProtobuf.MultipleMessage msg) throws Exception {
  4. //@1
  5. int dataType = msg.getDataType().getNumber();
  6. //@2
  7. if(dataType == 0){
  8. NettyMulProtobuf.Student student = msg.getStudent();
  9. System.out.println(student.getName() + "," + student.getAge() + "," + student.getAddress());
  10. //把消息写到缓冲区
  11. ctx.write(msg);
  12. } else if (dataType == 1){
  13. NettyMulProtobuf.School school = msg.getSchool();
  14. System.out.println(school.getSchoolName() + "," + school.getSchoolAddress());
  15. //把消息写到缓冲区
  16. ctx.write(msg);
  17. }
  18. }
  19. @Override
  20. public void channelReadComplete(ChannelHandlerContext ctx) {
  21. //刷新缓冲区,把消息发出去
  22. ctx.flush();
  23. }
  24. @Override
  25. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  26. //异常处理,如果该handler不处理,将会传递到下一个handler
  27. cause.printStackTrace();
  28. }
  29. @Override
  30. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  31. ctx.writeAndFlush("connect success!");
  32. }
  33. }
  • @1 得到数据类型,0为Student,1为School。
  • @2 判断类型,来获取正确的dataBody

Client handler

  1. public class PBMulClientHandler extends SimpleChannelInboundHandler<NettyMulProtobuf.MultipleMessage> {
  2. @Override
  3. protected void channelRead0(ChannelHandlerContext ctx, NettyMulProtobuf.MultipleMessage msg) throws Exception {
  4. System.out.println("客户端收到信息:" + msg.getStudent().getAddress());
  5. }
  6. @Override
  7. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  8. //组装Student数据
  9. NettyMulProtobuf.MultipleMessage student = NettyMulProtobuf.MultipleMessage.newBuilder()
  10. .setDataType(NettyMulProtobuf.MultipleMessage.DataType.StudentType).setStudent(NettyMulProtobuf.Student.newBuilder().setAddress("student address")
  11. .setAge(10).setName("student name").build()).build();
  12. //组装School数据
  13. NettyMulProtobuf.MultipleMessage school = NettyMulProtobuf.MultipleMessage.newBuilder()
  14. .setDataType(NettyMulProtobuf.MultipleMessage.DataType.SchoolType).setSchool(
  15. NettyMulProtobuf.School.newBuilder().setSchoolName("school name")
  16. .setSchoolAddress("school address")
  17. .build()
  18. ).build();
  19. ctx.write(student);
  20. ctx.write(school);
  21. ctx.flush();
  22. }
  23. }

Client handler 包装了不同的数据体发送到服务端。

到此,我们就可以实现支持多种数据结构的解析。

总结

Netty具体的组件或用法可以参考我其他文章。

协议系列文章:

Netty协议之 TCP(一)
Netty协议之 UDP(二)

框架系列文章:

Netty学习(一)Netty的概念及体系结构

源码可以见:Netty-Learn

发表评论

表情:
评论列表 (有 0 条评论,115人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Netty序列化协议Protocol buff

    序列化协议 序列化和反序列化 把对象转换为字节序列的过程称为对象的序列化,把字节序列恢复为对象的过程称为对象的反序列化。用途:文件的copy、网络数据的传输 Pro