Netty 整合 MessagePack 序列化框架 + LengthFieldBasedFrameDecoder 自定义解码器 超、凢脫俗 2022-04-14 06:15 596阅读 0赞 目录 环境准备及说明 MessagePack 编解码器 MessagePack 编码器 MessagePack 解码器 POJO User Netty 网络通信 服务端 客户端 运行测试 环境准备及说明 如果是导入二进制开发包,则如下所示: 需要开发包的可以参考《 MessagePack 开发入门详解》。 如果是 Maven 项目,则添加如下依赖: <!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.30.Final</version> </dependency> <!-- https://mvnrepository.com/artifact/org.msgpack/msgpack --> <dependency> <groupId>org.msgpack</groupId> <artifactId>msgpack</artifactId> <version>0.6.12</version> </dependency> <!-- https://mvnrepository.com/artifact/org.javassist/javassist --> <dependency> <groupId>org.javassist</groupId> <artifactId>javassist</artifactId> <version>3.24.0-GA</version> </dependency> 1)netty-all:是 Netty 开发包 2)msgpack:是 Messagepck 序列化开发包 3)javassist:是 msgpack 自己的依赖包 本文示例项目结构如下: 1)User:网络传输的 POJO 对象,注意:序列化 POJO 必须加 org.msgpack.annotation.Message 注解:@Message 2)echo:包中为 netty 通信的客户端与服务端 3)messagepack:包中为 MessagePack 编解码器 特别提醒: 1)虽然 MessagePack 用于序列化对象,但是普通 String、Integer 等等同样也是对象,所以照样可以传输普通的字符串等消息2)需要序列化的 POJO 对象上必须加上 org.msgpack.annotation.Message 注解:@Message,否则传输会失败,而且也不报错,很难排查3)MessagePack 序列化对象后的消息,经过发送后,接收端 channelRead(ChannelHandlerContext ctx, Object msg) 3.1)即使发送的是 User 对象,接收端的 msg 也不能进行 User user = (User)msg 强转,否则客户端会被强制断开连接 3.2)如果发送的是 User 对象,接收端可以转为 List objects = (List) msg,list 中的元素对应 User 的属性值 3.3)如果发送的不是 POJO 对象,而是简单的 String 对象,则不能转为 List,否则客户端也会被强制断开 MessagePack 编解码器 利用 Netty 的编解码框架可以非常方便的集成第三方序列化框架,Netty 预集成了几种常用的编解码框架,用户也可以根据自己项目的实际情况集成其它编解码框架,或者进行自定义。MessagePack 编码器package com.example.messagepack;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder;import org.msgpack.MessagePack;/** * Created by Administrator on 2018/11/25 0025. * MessagePack 编码器 —— 继承 Netty 的 MessageToByteEncoder,比重写方法 */public class MsgpackEncoder extends MessageToByteEncoder<Object> { /** * 重写方法,负责将 Object 类型的 POJO 对象编码为 byte 数组,然后写入 ByteBuf 中 * * @param channelHandlerContext * @param o * @param byteBuf * @throws Exception */ @Override protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception { MessagePack messagePack = new MessagePack(); /** 序列化对象*/ byte[] raw = messagePack.write(o); byteBuf.writeBytes(raw); }}MessagePack 解码器package com.example.messagepack;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToMessageDecoder;import org.msgpack.MessagePack;import java.util.List;/** * Created by Administrator on 2018/11/25 0025. * MessagePack 解码器 - 继承 Netty 的 MessageToMessageDecoder,并重写方法 */public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> { /** * 重写方法,首先从数据报 byteBuf 中获取需要解码的 byte 数组, * 然后调用 MessagePack 的 read 方法将其反序列化为 Object 对象,将解码后的对象加入到解码列表 list 中, * 这样就完成了 MessagePack 的解码操作 * * @param channelHandlerContext * @param byteBuf * @param list * @throws Exception */ @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { int length = byteBuf.readableBytes(); byte[] array = new byte[length]; byteBuf.getBytes(byteBuf.readerIndex(), array, 0, length); MessagePack messagePack = new MessagePack(); list.add(messagePack.read(array)); }}POJO Userpackage com.example.domain;import org.msgpack.annotation.Message;import java.util.Date;/** * Created by Administrator on 2018/11/25 0025. * 用户 实体 * 需要序列化的 POJO 对象上必须加上 org.msgpack.annotation.Message 注解:@Message */@Messagepublic class User { private Integer pId; private String pName; private Date birthday; private Boolean isMarry; public Date getBirthday() { return birthday; } public void setBirthday(Date birthday) { this.birthday = birthday; } public Integer getpId() { return pId; } public void setpId(Integer pId) { this.pId = pId; } public String getpName() { return pName; } public void setpName(String pName) { this.pName = pName; } public Boolean getIsMarry() { return isMarry; } public void setIsMarry(Boolean isMarry) { this.isMarry = isMarry; } @Override public String toString() { return "User{" + "birthday=" + birthday + ", pId=" + pId + ", pName='" + pName + '\'' + ", isMarry=" + isMarry + '}'; }}Netty 网络通信 首先模拟的情况是:客户端连接上服务器后,给服务器连发消息,服务器接收后会将原信息进回复,同时会解决 TCP 粘包与拆包。会使用 Netty 的 LengthFieldPrepender、LengthFieldBasedFrameDecoder 编解码器处理半包消息,不会出现 TCP 粘包/拆包。服务端EchoServer 内容如下: package com.example.echo;import com.example.messagepack.MsgpackDecoder;import com.example.messagepack.MsgpackEncoder;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.DelimiterBasedFrameDecoder;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import io.netty.handler.codec.LengthFieldPrepender;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;/** * Created by Administrator on 2018/11/11 0011. * Echo 服务器 */public class EchoServer { public static void main(String[] args) { int port = 9898; new EchoServer().bind(port); } public void bind(int port) { /** * interface EventLoopGroup extends EventExecutorGroup extends ScheduledExecutorService extends ExecutorService * 配置服务端的 NIO 线程池,用于网络事件处理,实质上他们就是 Reactor 线程组 * bossGroup 用于服务端接受客户端连接,workerGroup 用于进行 SocketChannel 网络读写*/ EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { /** ServerBootstrap 是 Netty 用于启动 NIO 服务端的辅助启动类,用于降低开发难度 * */ final ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) // 设置TCP连接超时时间 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { System.out.println(Thread.currentThread().getName() + ",服务器初始化通道..."); /** * 为了处理半包消息,添加如下两个 Netty 内置的编解码器 * LengthFieldPrepender:前置长度域编码器——放在MsgpackEncoder编码器前面 * LengthFieldBasedFrameDecoder:长度域解码器——放在MsgpackDecoder解码器前面 * 关于 长度域编解码器处理半包消息,本文不做详细讲解,会有专门篇章进行说明 */ ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2)); ch.pipeline().addLast("MessagePack encoder", new MsgpackEncoder()); ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2)); ch.pipeline().addLast("MessagePack Decoder", new MsgpackDecoder()); ch.pipeline().addLast(new EchoServerHandler()); } }); /**服务器启动辅助类配置完成后,调用 bind 方法绑定监听端口,调用 sync 方法同步等待绑定操作完成*/ ChannelFuture f = b.bind(port).sync(); System.out.println(Thread.currentThread().getName() + ",服务器开始监听端口,等待客户端连接........."); /**下面会进行阻塞,等待服务器连接关闭之后 main 方法退出,程序结束* */ f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { /**优雅退出,释放线程池资源*/ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}EchoServerHandler 内容如下: package com.example.echo;import com.example.domain.User;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import java.util.List;import java.util.concurrent.atomic.AtomicInteger;/** * Created by Administrator on 2017/5/16. * ChannelInboundHandlerAdapter extends ChannelHandlerAdapter 用于对网络事件进行读写操作 */public class EchoServerHandler extends ChannelInboundHandlerAdapter { /** * 因为多线程,所以使用原子操作类来进行计数 */ private static AtomicInteger atomicInteger = new AtomicInteger(); /** * 收到客户端消息,自动触发 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println((atomicInteger.addAndGet(1)) + "--->" + Thread.currentThread().getName() + ",The server receive order : " + msg); /** * 如果传输的是 POJO 对象,则可以转成 List<Object> * list 中的每一个元素都是发送来的 POJO 对象的属性值 * 注意:如果对方传输只是简单的 String 对象,则不能强转为 List<Object> */ /* List<Object> objects = (List<Object>) msg; for (Object obj : objects) { System.out.println("属性:" + obj); }*/ /** * 服务端接收到客户端发送来的数据后,再回发给客户端 */ ctx.writeAndFlush(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("-----客户端关闭:" + ctx.channel().remoteAddress()); /**当发生异常时,关闭 ChannelHandlerContext,释放和它相关联的句柄等资源 */ ctx.close(); }}客户端EchoClient 内容如下: package com.example.echo;import com.example.messagepack.MsgpackDecoder;import com.example.messagepack.MsgpackEncoder;import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.DelimiterBasedFrameDecoder;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import io.netty.handler.codec.LengthFieldPrepender;import io.netty.handler.codec.string.StringDecoder;/** * Created by Administrator on 2017/5/16. * Echo 客户端 */public class EchoClient { /** * 使用 2 个线程模拟 2 个客户端 * * @param args */ public static void main(String[] args) { for (int i = 0; i < 2; i++) { new Thread(new MyThread()).start(); } } static class MyThread implements Runnable { @Override public void run() { connect("192.168.1.20", 9898); } public void connect(String host, int port) { /**配置客户端 NIO 线程组/池*/ EventLoopGroup group = new NioEventLoopGroup(); try { /**Bootstrap 与 ServerBootstrap 都继承(extends)于 AbstractBootstrap * 创建客户端辅助启动类,并对其配置,与服务器稍微不同,这里的 Channel 设置为 NioSocketChannel * 然后为其添加 Handler,这里直接使用匿名内部类,实现 initChannel 方法 * 作用是当创建 NioSocketChannel 成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络I/O事件*/ Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) // 设置TCP连接超时时间 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { System.out.println(Thread.currentThread().getName() + ",客户端初始化管道..."); /** * 为了处理半包消息,添加如下两个 Netty 内置的编解码器 * LengthFieldPrepender:前置长度域编码器——放在MsgpackEncoder编码器前面 * LengthFieldBasedFrameDecoder:长度域解码器——放在MsgpackDecoder解码器前面 * 关于 长度域编解码器处理半包消息,本文不做详细讲解,会有专门篇章进行说明 */ ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2)); ch.pipeline().addLast("MessagePack encoder", new MsgpackEncoder()); ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2)); ch.pipeline().addLast("MessagePack Decoder", new MsgpackDecoder()); ch.pipeline().addLast(new EchoClientHandler()); } }); /**connect:发起异步连接操作,调用同步方法 sync 等待连接成功*/ ChannelFuture channelFuture = b.connect(host, port).sync(); System.out.println(Thread.currentThread().getName() + ",客户端发起异步连接.........."); /**等待客户端链路关闭*/ channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { /**优雅退出,释放NIO线程组*/ group.shutdownGracefully(); } } }}EchoClientHandler 内容如下: package com.example.echo;import com.example.domain.User;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import java.util.Arrays;import java.util.Date;import java.util.List;import java.util.concurrent.atomic.AtomicInteger;/** * Created by Administrator on 2017/5/17. * 用于对网络事件进行读写操作 */public class EchoClientHandler extends ChannelInboundHandlerAdapter { /** * 因为 Netty 采用线程池,所以这里使用原子操作类来进行计数 */ private static AtomicInteger atomicInteger = new AtomicInteger(); /** * 当客户端和服务端 TCP 链路建立成功之后,Netty 的 NIO 线程会调用 channelActive 方法 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { /** * 多余 数组、List、Set、Map 等,对立面的元素逐个进行发送,则对方也是逐个接收 * 否则如果直接发送 数组、List、Set、Map 等,则对方会统一接收 * 注意:因为使用LengthFieldPrepender、LengthFieldBasedFrameDecoder编解码器处理半包消息 * 所以这里连续发送也不会出现 TCP 粘包/拆包 */ List<User> users = getUserArrayData(); for (User user : users) { ctx.writeAndFlush(user); } ctx.writeAndFlush("我是普通的字符串消息" + Thread.currentThread().getName()); } /** * 当服务端返回应答消息时,channelRead 方法被调用,从 Netty 的 ByteBuf 中读取并打印应答消息 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println((atomicInteger.addAndGet(1)) + "---" + Thread.currentThread().getName() + ",Server return Message:" + msg); } /** * 当发生异常时,打印异常 日志,释放客户端资源 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { /**释放资源*/ ctx.close(); } /** * 设置网络传输的 POJO 对象数组/列表 * * @return */ public List<User> getUserArrayData() { User[] users = new User[5]; User loopUser = null; for (int i = 0; i < 5; i++) { loopUser = new User(); loopUser.setpId(i + 1); loopUser.setpName("华安" + Thread.currentThread().getName()); loopUser.setIsMarry(true); loopUser.setBirthday(new Date()); users[i] = loopUser; } return Arrays.asList(users); }}运行测试 先运行服务器,再运行客户端。
相关 【Netty】七、Netty自定义编码和解码器 Netty自定义编码和解码 一、Netty自定义编码和解码) 服务端代码 ProtocolServer ProtocolServer 浅浅的花香味﹌/ 2023年10月09日 15:07/ 0 赞/ 34 阅读
相关 Netty学习08--自定义序列化协议之自定义序列化协议 Serializer package core; import java.nio.charset.Charset; import j 小咪咪/ 2023年02月18日 08:28/ 0 赞/ 136 阅读
相关 netty编解码器与序列化框架分析 netty编解码器分析 编码(Encode)也称为序列化(serialization),它将对象序列化为字节数组,用于网络传输、数据持久化或者其它用途。 反之,解码(D 分手后的思念是犯贱/ 2022年12月20日 03:30/ 0 赞/ 207 阅读
相关 netty系列之:自定义编码解码器 文章目录 简介 自定义编码器 自定义解码器 添加编码解码器到pipeline 计算2的N次方 总结 简介 在之前的netty系 电玩女神/ 2022年09月05日 00:24/ 0 赞/ 273 阅读
相关 Netty(编解码器框架) 每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和目标应用程序的数据格式做相互转换。这种转换逻辑由编解码器处理,编解码器 由编码器和解 古城微笑少年丶/ 2022年04月17日 06:07/ 0 赞/ 400 阅读
相关 Netty 整合 MessagePack 序列化框架 + LengthFieldBasedFrameDecoder 自定义解码器 目录 环境准备及说明 MessagePack 编解码器 MessagePack 编码器 MessagePack 解码器 POJO User Netty 网络通信 超、凢脫俗/ 2022年04月14日 06:15/ 0 赞/ 597 阅读
相关 《序列化与自定义Request、Response编解码器》 《序列化与自定义Request、Response编解码器》 序列化是如何实现的 序列化三种底层实现方式 使用JDK的ByteArra 「爱情、让人受尽委屈。」/ 2022年02月23日 02:50/ 0 赞/ 477 阅读
相关 Netty中序列化框架MessagePack的简单实现 MessagePack是一个高效的二进制序列化框架,它像JSON一样支持不同语言间的数据交换,但是它的性能更快,序列化之后的码流也更小。MessagePack的特点如下: ゝ一纸荒年。/ 2022年02月20日 06:15/ 0 赞/ 297 阅读
相关 [Netty]LengthFieldBasedFrameDecoder 作者:简书闪电侠 链接:https://www.jianshu.com/p/a0a51fd79f62 拆包的原理 关于拆包原理的上一篇博文 [netty源码分析之拆 - 日理万妓/ 2022年01月29日 05:37/ 0 赞/ 256 阅读
相关 Netty自定义序列化协议 自定义序列化协议 序列化的目的就是想对象转化成字节数组byteArray 阶段一 使用流的形式 public class Test{ public 谁践踏了优雅/ 2021年12月14日 12:03/ 0 赞/ 476 阅读
还没有评论,来说两句吧...