Netty中序列化框架Protobuf的简单实现
什么是protocol buffers
Protobuf是一种平台无关、语言无关、可扩展且轻便高效的序列化数据结构的协议,可以用于网络通信和数据存储。
Protobuf入门
1.开发环境搭建
protobuf现在官方的最新版本是3.7.x版本,https://github.com/protocolbuffers/protobuf/releases ,protobuf2和protobuf3版本区别还是蛮大的,hadoop中使用的就是protobuf来实现序列化的,我们在此处使用的版本是2.5,官网对于此版本已经没有下载链接了,我在百度云盘上提供有(windows,linux):
链接:https://pan.baidu.com/s/1kxhlNqlu2Z3_E65Zi7W7Pg
提取码:vcqh
定义proto文件
SubscribeReq.proto
package netty;
option java_package = "com.dpb.netty.codec.protobuf";
option java_outer_classname = "SubscribeReqProto";
message SubscribeReq{
required int32 subReqID = 1;
required string userName = 2;
required string preductName = 3;
repeated string address = 4;
}
SubscribeResp.proto
package netty;
option java_package = "com.dpb.netty.codec.protobuf";
option java_outer_classname = "SubscribeRespProto";
message SubscribeResp{
required int32 subReqID = 1;
required int32 respCode = 2;
required string desc = 3;
}
生成java文件
&esmp;cmd进入命令行模式,进入相关文件夹。
分别指向下面两条命令
C:\tools>protoc-2.5.0-windows-x86_64.exe --java_out=./ SubscribeReq.proto
C:\tools>protoc-2.5.0-windows-x86_64.exe --java_out=./ SubscribeResp.proto
在相关文件夹下会生成对于的java文件,将文件拷贝到eclipse工作空间中。
2.编解码案例
演示protobuf编解码操作。
package com.dpb.netty.codec;
import java.util.ArrayList;
import java.util.List;
import com.dpb.netty.codec.protobuf.SubscribeReqProto;
import com.dpb.netty.codec.protobuf.SubscribeReqProto.SubscribeReq;
import com.google.protobuf.InvalidProtocolBufferException;
/** * protobuf 编解码操作案例 * * @author 波波烤鸭 * @email dengpbs@163.com * */
public class TestSubscribeReqProto {
/** * 编码 * @return */
private static byte[] encode(SubscribeReqProto.SubscribeReq req){
return req.toByteArray();
}
/** * 解码 * @param body * @return * @throws InvalidProtocolBufferException */
private static SubscribeReqProto.SubscribeReq decode(byte[] body)
throws InvalidProtocolBufferException{
return SubscribeReqProto.SubscribeReq.parseFrom(body);
}
/** * 构建SubscribeReq对象 * @return */
private static SubscribeReqProto.SubscribeReq createSubscribeReq(){
SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
builder.setSubReqID(1);
builder.setUserName("bobo");
builder.setPreductName("Netty");
List<String> address = new ArrayList<>();
address.add("beijing");
address.add("guangzhou");
address.add("shezheng");
builder.addAllAddress(address);
return builder.build();
}
public static void main(String[] args) throws InvalidProtocolBufferException {
SubscribeReqProto.SubscribeReq req = createSubscribeReq();
System.out.println("编码前:"+req.toString());
SubscribeReq req2 = decode(encode(req));
System.out.println("编码后:"+req);
System.out.println("编码后:"+req2);
System.out.println(req2.equals(req));
}
}
输出结果:
编码前:subReqID: 1
userName: "bobo"
preductName: "Netty"
address: "beijing"
address: "guangzhou"
address: "shezheng"
编码后:subReqID: 1
userName: "bobo"
preductName: "Netty"
address: "beijing"
address: "guangzhou"
address: "shezheng"
编码后:subReqID: 1
userName: "bobo"
preductName: "Netty"
address: "beijing"
address: "guangzhou"
address: "shezheng"
true
通过结果我们发现编码前后的结果是一致的而且前后对象是等价的。
Netty中Protobuf案例
服务端程序
SubReqServer
package com.dpb.netty.codec;
import com.dpb.netty.codec.protobuf.SubscribeReqProto;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/** * 图书订购服务端 * @author 波波烤鸭 * @email dengpbs@163.com * */
public class SubReqServer {
private void bind(int port)throws Exception{
// 配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
// 处理半包问题
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
// 添加解码器
ch.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));
// 处理半包问题
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
// 添加编码器
ch.pipeline().addLast(new ProtobufEncoder());
ch.pipeline().addLast(new SubReqServerHandler());
}
});
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new SubReqServer().bind(8080);
}
}
SubReqServerHandler
package com.dpb.netty.codec;
import com.dpb.netty.codec.protobuf.SubscribeReqProto;
import com.dpb.netty.codec.protobuf.SubscribeRespProto;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class SubReqServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 获取消息
SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq) msg;
// 验证账号
if ("bobo".equalsIgnoreCase(req.getUserName())) {
System.out.println("Service accept client subscribe req : [" + req.toString() + "]");
// 回写消息
ctx.writeAndFlush(resp(req.getSubReqID()));
}
}
private SubscribeRespProto.SubscribeResp resp(int subReqID) {
SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder();
builder.setSubReqID(subReqID);
builder.setRespCode(0);
builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
return builder.build();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();// 发生异常,关闭链路
}
}
客户端程序
SubReqClient
package com.dpb.netty.codec;
import com.dpb.netty.codec.protobuf.SubscribeRespProto;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
public class SubReqClient {
public void connect(int port, String host) throws Exception {
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance()));
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder());
ch.pipeline().addLast(new SubReqClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
// 当代客户端链路关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
/** * @param args * @throws Exception */
public static void main(String[] args) throws Exception {
int port = 8080;
new SubReqClient().connect(port, "127.0.0.1");
}
}
SubReqClientHandler
package com.dpb.netty.codec;
import java.util.ArrayList;
import java.util.List;
import com.dpb.netty.codec.protobuf.SubscribeReqProto;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class SubReqClientHandler extends ChannelHandlerAdapter {
/** * Creates a client-side handler. */
public SubReqClientHandler() {
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
for (int i = 0; i < 10; i++) {
ctx.write(subReq(i));
}
ctx.flush();
}
private SubscribeReqProto.SubscribeReq subReq(int i) {
SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
builder.setSubReqID(i);
builder.setUserName("bobo");
builder.setPreductName("Netty Book For Protobuf");
List<String> address = new ArrayList<>();
address.add("NanJing");
address.add("BeiJing");
address.add("ShenZhen");
builder.addAllAddress(address);
return builder.build();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Receive server response : [" + msg + "]");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
测试
服务端的输出:
Service accept client subscribe req : [subReqID: 0
userName: "bobo"
preductName: "Netty Book For Protobuf"
address: "NanJing"
address: "BeiJing"
address: "ShenZhen"
]
.......
Service accept client subscribe req : [subReqID: 9
userName: "bobo"
preductName: "Netty Book For Protobuf"
address: "NanJing"
address: "BeiJing"
address: "ShenZhen"
]
客户端的输出:
Receive server response : [subReqID: 0
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
....
Receive server response : [subReqID: 9
respCode: 0
desc: "Netty book order succeed, 3 days later, sent to the designated address"
]
运行结果表明,我们基于Netty protobuf编解码框架开发的案例可以正常工作,利用Netty提供的Protobuf编解码能力,我们在不需要了解Protobuf实现和使用细节的情况下就能轻松支持Protobuf编解码,可以方便地实现跨语言的远程服务调用和与周边异构系统进行通信对接。
还没有评论,来说两句吧...