初识Netty三(基于MessagePack实现编解码)
MessagePack介绍
目前主要的编解码:
- Java序列化
- Marshalig
- XML
- JSON
- MessagePack
- Protobuf
…
本次编解码是使用 MessagePack实现的,MessagePack主要优点:
- 高效
- 跨语言
官网解释:
更多介绍请自行查看官网
MessagePack 使用
引入依赖
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
</dependency>
MessagePack 编解码使用起来非常方便,像这样
@org.junit.Test
public void test() throws Exception{
List<String> list = Lists.newArrayList();
list.add("msgpack");
list.add("kumofs");
list.add("viver");
MessagePack msgpack = new MessagePack();
// Serialize
byte[] raw = msgpack.write(list);
// Deserialize directly using a template(直接使用模板反序列化)
List<String> list1 = msgpack.read(raw, Templates.tList(Templates.TString));
list.forEach(s -> System.out.println(s));
}
基于 MessagePack 编解码器实现
编码器
public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
final byte[] array;
final int length = byteBuf.readableBytes();
array = new byte[length];
// 获取数据包 byteBuf中的byte数组
byteBuf.getBytes(byteBuf.readerIndex(), array, 0, length);
MessagePack msgpack = new MessagePack();
//调用 msgpack.read 反序列化然后加入到list中
list.add(msgpack.read(array));
}
}
解码器
/** * @author WH * @version 1.0 * @date 2020/5/28 23:49 * @Description MsgPackEncoder 继承 MessageToByteEncoder * 它负责将Object类型的POJO对象编码为byte数组,然后写入到ByteBuf中 */
public class MsgPackEncoder extends MessageToByteEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
MessagePack msgpack = new MessagePack();
//Serialize
byte[] raw = msgpack.write(o);
byteBuf.writeBytes(raw);
}
}
客户端实现
EchoClient
public class EchoClient {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65535,0,
2,0,2));
ch.pipeline().addLast("msgpack decoder", new MsgPackDecoder());
ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));
ch.pipeline().addLast("msgpack encoder", new MsgPackEncoder());
ch.pipeline().addLast(new EchoClientHandler(10));
}
});
//发起异步连接操作
ChannelFuture f = b.connect("127.0.0.1", 8080).sync();
//等待客户端链路关闭
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
//优雅退出,释放NIO线程租
group.shutdownGracefully();
}
}
}
EchoClientHandler
@Slf4j
public class EchoClientHandler extends ChannelHandlerAdapter {
private final int sendNumber;
private static List<User> users;
public EchoClientHandler(int sendNumber) {
this.sendNumber = sendNumber;
users = new ArrayList<>(sendNumber);
for (int i = 0; i < sendNumber; i++) {
String name = "阿离" + i;
User user = new User(name, i);
users.add(user);
}
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (User user : users) {
ctx.write(user);
log.info("客户端发送消息为" + user);
ctx.writeAndFlush(user);
}
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("接收到服务器的消息: {}", msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
服务端实现
EchoServer
public class EchoServer {
public static void main(String[] args) {
//配置服务端的NIO线程租
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception{
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,0,
2,0,2));
ch.pipeline().addLast(new MsgPackDecoder());
ch.pipeline().addLast(new LengthFieldPrepender(2));
ch.pipeline().addLast(new MsgPackEncoder());
ch.pipeline().addLast(new EchoServerHandler());
}
});
//绑定端口,成功同步等待
ChannelFuture f = b.bind(8080).sync();
//订单服务端监听端口关闭
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
//优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
EchoServerHandler
@Slf4j
public class EchoServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.writeAndFlush(msg);
log.info("服务端接受到的消息为:{}", msg);
}
}
测试
源码下载
https://github.com/weihubeats/netty-student
还没有评论,来说两句吧...