Netty协议之 Protocol Buffers (三)
前言
本章将会介绍如何使用Netty搭建一个支持Protocol Buffers协议的服务器,提供支持多数据/消息结构体的解析方法。
Protocol Buffers 协议
Protocol Buffers 是一种轻便高效的结构化数据存储格式,可以用于结构化数据序列化,很适合做数据存储或 RPC 数据交换格式。它可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。
PB文件的生成
编写PB文件
首先我们先定义好传输协议的结构。
NettyProtobuf.proto
syntax = "proto3";
//生成java类的package包名
option java_package = "com.chenws.netty.protobuf.proto";
//生成java类的类名
option java_outer_classname="NettyProtobuf";
//定义一个people
message People {
//姓名
string name = 1;
//年龄
int32 age = 2;
//性别
string gender = 3;
//地址
string address = 4;
}
我们使用Protocol Buffers 提供的一个exe文件来生成PB格式的java类。
可以在windows cmd上执行命令
protoc --java_out=生成类文件名录 pb文件名(这里为NettyProtobuf.proto)
## 例如:
protoc --java_out=./ ./NettyProtobuf.proto
exe文件可见:protoc.exe
这样我们就生成了一个类名为 NettyProtobuf.java 的java类。
Netty 支持
导入依赖包
// gradle
compile group: 'io.netty', name: 'netty-all', version: '4.1.50.Final'
// maven
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.50.Final</version>
</dependency>
Server 服务端
编写启动类
与之前TCP文章一样,不同的地方只是针对channel的初始化。
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(6);
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//添加自定义处理器
socketChannel.pipeline()
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(NettyProtobuf.People.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(new PBNettyHandler());
}
});
int port = 8082;
//监听器,当服务绑定成功后执行
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
//监听器,当停止服务后执行。
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
- bossGroup 里面初始化了一条线程(这里就是NioEventLoop,包含一个NIO SELECTOR,一个队列,一条线程),其作用就是负责处理所有客户端连接的请求,创建新连接(channel),然后注册到workerGroup 的其中一个NioEventLoop。之后该channel的所有读/写操作都由与Channel绑定的NioEventLoop来处理。
- workerGroup 如上所说,主要是负责处理读/写操作,这里初始化了6条线程(NioEventLoop)去执行,不设置的话默认会根据CPU核数得到一个合适的数目。
- Channel的整个生命周期都由同一个NioEventLoop处理。
- new ProtobufDecoder(NettyProtobuf.People.getDefaultInstance()) 这里指定了解码器的类型为People(上面用exe生成的java类),因此这样只能解码一种类型的数据。本文章会提供支持多种数据结构体类型的方法,请往下看。
handler 处理器
public class PBNettyHandler extends SimpleChannelInboundHandler<NettyProtobuf.People> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, NettyProtobuf.People msg) throws Exception {
System.out.println(msg);
//把消息写到缓冲区
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
//刷新缓冲区,把消息发出去
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//异常处理,如果该handler不处理,将会传递到下一个handler
cause.printStackTrace();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("connect success!");
}
}
到此服务器已经完成。
Client 客户端
启动类
跟服务端基本一样,这里直接贴上代码。
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(bossGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//添加自定义处理器
socketChannel.pipeline()
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(NettyProtobuf.People.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(new PBClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost",8082).sync();
//监听器,当停止服务后执行。
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
}
handler处理器
public class PBClientHandler extends SimpleChannelInboundHandler<NettyProtobuf.People> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, NettyProtobuf.People msg) throws Exception {
System.out.println("客户端收到信息:" + msg.getAddress());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
NettyProtobuf.People build = NettyProtobuf.People.newBuilder().setAddress("广州市").setAge(50).setGender("男").setName("小白").build();
ctx.writeAndFlush(build);
}
}
处理器可见当与服务器创建通道后,就会往服务端发送一条数据。
NettyProtobuf.People build = NettyProtobuf.People.newBuilder().setAddress("广州市").setAge(50).setGender("男").setName("小白").build();
服务端收到后,会打印消息并且原样返回给客户端。
到此,Netty对PB的支持已经介绍完了,但我们希望服务端能提高兼容性,可以兼容多种数据结构。如果使用以上的方法,则需要不同的结构体需要跑多个server,这显然不实际。下面一起看看多数据结构传输的用法。
支持多结构体协议
编写PB文件
syntax = "proto3";
option java_package = "com.chenws.netty.protobuf.proto";
option java_outer_classname="NettyMulProtobuf";
message MultipleMessage {
enum DataType {
StudentType = 0;
SchoolType = 1;
}
DataType data_type = 1;
oneof dataBody {
Student student = 2;
School school = 3;
}
}
message Student {
string name = 1;
int32 age = 2;
string address = 3;
}
message School {
string school_name = 1;
string school_address = 2;
}
- 定义了一个枚举 DataType,用来判断使用的是哪种数据结构体
- oneof 的作用是指定dataBody中的字段最多只有一个能被赋值。
- MultipleMessage 由枚举DataType以及dataBody组成。
- Student和School为具体的数据结构,为dataBody的组成部分
一样,生成它对应的java文件。
Server服务端
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(6);
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//添加自定义处理器
socketChannel.pipeline()
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(NettyMulProtobuf.MultipleMessage.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(new PBMulNettyHandler());
}
});
int port = 8082;
//监听器,当服务绑定成功后执行
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
//监听器,当停止服务后执行。
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
注意new ProtobufDecoder(NettyMulProtobuf.MultipleMessage.getDefaultInstance()),指定了数据解码类型为MultipleMessage。
Server handler
public class PBMulNettyHandler extends SimpleChannelInboundHandler<NettyMulProtobuf.MultipleMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, NettyMulProtobuf.MultipleMessage msg) throws Exception {
//@1
int dataType = msg.getDataType().getNumber();
//@2
if(dataType == 0){
NettyMulProtobuf.Student student = msg.getStudent();
System.out.println(student.getName() + "," + student.getAge() + "," + student.getAddress());
//把消息写到缓冲区
ctx.write(msg);
} else if (dataType == 1){
NettyMulProtobuf.School school = msg.getSchool();
System.out.println(school.getSchoolName() + "," + school.getSchoolAddress());
//把消息写到缓冲区
ctx.write(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
//刷新缓冲区,把消息发出去
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//异常处理,如果该handler不处理,将会传递到下一个handler
cause.printStackTrace();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("connect success!");
}
}
- @1 得到数据类型,0为Student,1为School。
- @2 判断类型,来获取正确的dataBody
Client handler
public class PBMulClientHandler extends SimpleChannelInboundHandler<NettyMulProtobuf.MultipleMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, NettyMulProtobuf.MultipleMessage msg) throws Exception {
System.out.println("客户端收到信息:" + msg.getStudent().getAddress());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//组装Student数据
NettyMulProtobuf.MultipleMessage student = NettyMulProtobuf.MultipleMessage.newBuilder()
.setDataType(NettyMulProtobuf.MultipleMessage.DataType.StudentType).setStudent(NettyMulProtobuf.Student.newBuilder().setAddress("student address")
.setAge(10).setName("student name").build()).build();
//组装School数据
NettyMulProtobuf.MultipleMessage school = NettyMulProtobuf.MultipleMessage.newBuilder()
.setDataType(NettyMulProtobuf.MultipleMessage.DataType.SchoolType).setSchool(
NettyMulProtobuf.School.newBuilder().setSchoolName("school name")
.setSchoolAddress("school address")
.build()
).build();
ctx.write(student);
ctx.write(school);
ctx.flush();
}
}
Client handler 包装了不同的数据体发送到服务端。
到此,我们就可以实现支持多种数据结构的解析。
总结
Netty具体的组件或用法可以参考我其他文章。
协议系列文章:
Netty协议之 TCP(一)
Netty协议之 UDP(二)
框架系列文章:
Netty学习(一)Netty的概念及体系结构
源码可以见:Netty-Learn
还没有评论,来说两句吧...