【Netty】七、Netty自定义编码和解码器 浅浅的花香味﹌ 2023-10-09 15:07 64阅读 0赞 Netty自定义编码和解码 一、Netty自定义编码和解码)服务端代码 ProtocolServerProtocolServerHandler 客户端 ProtocolClientProtocolClientHandler 编解码器 RpcDecoder 、RpcEncoder 一、Netty自定义编码和解码)编解码器的作用就是讲原始字节数据与自定义的消息对象进行互转,网络中都是以字节码的数据形式来传输数据的,服务器编码数据后发送到客户端,客户端需要对数据进行解码,因为编解码器由两部分组成: Decoder(解码器)Encoder(编码器)自定义编码器和解码器,可以参考netty里面内置的一些编码器和解码器的代码,比如:StringDecoder、StringEncoderObjectDecoder、ObjectEncoder如下的例子是采用Fastjson将一个RpcMessage 的java对象在网络中传输所进行的编码和解码,更多场景可以采用该思路进行;解码: @Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { ByteBuf frame = in.retainedDuplicate();final String content = frame.toString(CharsetUtil.UTF_8);RpcMessage message = JSON.parseObject(content, RpcMessage.class);out.add(message);in.skipBytes(in.readableBytes());} 编码: @Overrideprotected void encode(ChannelHandlerContext ctx, RpcMessage msg, ByteBuf out) throws Exception { out.writeBytes(JSON.toJSONString(msg).getBytes(CharsetUtil.UTF_8));}服务端代码ProtocolServer/** * * Netty自定义编码和解码 服务端 */public class ProtocolServer { public static void main(String[] args) { ProtocolServer server = new ProtocolServer(); server.openSever(6666); } public void openSever(int port) { ServerBootstrap bootstrap = new ServerBootstrap(); EventLoopGroup bootGroup = new NioEventLoopGroup(1); //connect \accept \read \write EventLoopGroup workGroup = new NioEventLoopGroup(); bootstrap.group(bootGroup, workGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //解决粘包拆包 pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); //自定义的编码和解码 pipeline.addLast(new RpcDecoder()); pipeline.addLast(new RpcEncoder()); //业务处理handler pipeline.addLast(ProtocolServerHandler.INSTANCE); } }); try { System.out.println("服务启动成功"); ChannelFuture f = bootstrap.bind(port).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bootGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } }}ProtocolServerHandlerimport com.mytest.protocol.codec.RpcMessage;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;@ChannelHandler.Sharablepublic class ProtocolServerHandler extends ChannelInboundHandlerAdapter { public static final ProtocolServerHandler INSTANCE = new ProtocolServerHandler(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("服务端接收消息"); if (msg instanceof RpcMessage) { RpcMessage rpcMessage = (RpcMessage) msg; System.out.println("服务端接收到的消息:" + rpcMessage); //ctx.write(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); }}客户端ProtocolClientimport com.mytest.protocol.codec.RpcDecoder;import com.mytest.protocol.codec.RpcEncoder;import com.mytest.protocol.handler.ProtocolClientHandler;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import io.netty.handler.codec.LengthFieldPrepender;import java.io.IOException;/** * Netty自定义编码和解码 客户端 */public class ProtocolClient { public static void main(String[] args) throws IOException { ProtocolClient client = new ProtocolClient("127.0.0.1", 6666); } public ProtocolClient(String host, int port) { EventLoopGroup group = new NioEventLoopGroup(1); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group); bootstrap.channel(NioSocketChannel.class); bootstrap.remoteAddress(host, port); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //netty的粘包和拆包 pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); //自定义的编码和解码 pipeline.addLast(new RpcDecoder()); pipeline.addLast(new RpcEncoder()); //业务处理的handler pipeline.addLast(ProtocolClientHandler.INSTANCE); } }); try { ChannelFuture f = bootstrap.connect().sync(); f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("连接关闭,资源释放"); group.shutdownGracefully(); } }}ProtocolClientHandlerimport com.mytest.protocol.codec.RpcMessage;import com.mytest.protocol.codec.RpcMessageType;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;@ChannelHandler.Sharablepublic class ProtocolClientHandler extends ChannelInboundHandlerAdapter { public static final ProtocolClientHandler INSTANCE = new ProtocolClientHandler(); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //调用远程,传输一个java对象 RpcMessage rpcMessage = new RpcMessage(); rpcMessage.setContent("消息"); rpcMessage.setSender("发送者"); StringBuffer stringBuffer = new StringBuffer(); for (int i=0; i<100; i++) { stringBuffer.append("这是一个消息字名字的"); } rpcMessage.setReceiver(stringBuffer.toString()); rpcMessage.setRpcMessageType(RpcMessageType.LOGIN); rpcMessage.setTime(System.currentTimeMillis()); for (int i=0; i<20; i++) { ctx.writeAndFlush(rpcMessage); } System.out.println("启动后消息发送完毕"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("客户端接收"); if (msg instanceof RpcMessage) { RpcMessage rpcMessage = (RpcMessage) msg; System.out.println("客户端接收到的消息:" + rpcMessage); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}编解码器RpcDecoder 、RpcEncoder/** * 网络通信传输的 字节流数据 的解码器(字节流解码成程序能识别的数据(比如解码成一个java对象)) * */public class RpcDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception { //复制一份 字节流数据 ByteBuf frame = byteBuf.retainedDuplicate(); //把字节流数据转成字符串 final String content = frame.toString(CharsetUtil.UTF_8); //把字符串通过fastjson转成java对象 RpcMessage message = JSON.parseObject(content, RpcMessage.class); //把得到的java对象传给下一个handler out.add(message); //buf的数据已经读取过了,跳过已经读取的数据,更新一下buf的index下标 byteBuf.skipBytes(byteBuf.readableBytes()); }}import com.alibaba.fastjson.JSON;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder;import io.netty.util.CharsetUtil;public class RpcEncoder extends MessageToByteEncoder<RpcMessage> { @Override protected void encode(ChannelHandlerContext ctx, RpcMessage msg, ByteBuf out) throws Exception { out.writeBytes(JSON.toJSONString(msg).getBytes(CharsetUtil.UTF_8)); }}@Datapublic class RpcMessage { private RpcMessageType rpcMessageType; //消息类型[LOGIN]或者[SYSTEM]或者[LOGOUT] private long time; //消息发送时间 private String sender; //发送人 private String receiver; //接收人 private Object content; //消息内容}public enum RpcMessageType { /** * 系统消息 */ SYSTEM("SYSTEM"), /** * 登录消息 */ LOGIN("LOGIN"), /** * 登出消息 */ LOGOUT("LOGOUT"), /** * 调用消息 */ INVOKE("INVOKE"); private String name; public static boolean isRpcMessageType(String content) { return content.matches("^\\[(SYSTEM|LOGIN|LOGIN|INVOKE)\\]"); } RpcMessageType(String name) { this.name = name; } public String getName() { return this.name; } public String toString() { return this.name; }}
相关 【Netty】七、Netty自定义编码和解码器 Netty自定义编码和解码 一、Netty自定义编码和解码) 服务端代码 ProtocolServer ProtocolServer 浅浅的花香味﹌/ 2023年10月09日 15:07/ 0 赞/ 65 阅读
相关 netty系列之:netty中的懒人编码解码器 简介 netty之所以强大,是因为它内置了很多非常有用的编码解码器,通过使用这些编码解码器可以很方便的搭建出非常强大的应用程序,今天给大家讲讲netty中最基本的内置编码 痛定思痛。/ 2023年10月03日 14:18/ 0 赞/ 133 阅读
相关 netty 4.1.45自定义编解码器 文章目录 概述 自定义编解码器实现机制 编码器 解码器 服务端代码 客户端代码 运行结果 > 注:更多netty相关文章请访问 逃离我推掉我的手/ 2023年07月04日 06:17/ 0 赞/ 74 阅读
相关 【Netty】高阶使用:自定义编解码器 当你通过 Netty 发送或者接受一个消息的时候,就将会发生一次数据转换 入站时 – 消息被解码(Decode):从字节(二进制)转换为我们能读懂并操作的格式(int、 谁践踏了优雅/ 2023年01月01日 03:41/ 0 赞/ 403 阅读
相关 Netty 编解码器 Netty 编解码 在网络中数据是以字节码二进制的形式传输的,所以我们在使用 Netty 传输数据的时候,需要将我们传输的数据转换为 二进制的形式 进行传输,所以不管是我 梦里梦外;/ 2022年11月05日 06:24/ 0 赞/ 324 阅读
相关 netty系列之:netty中的懒人编码解码器 文章目录 简介 netty中的内置编码器 使用codec要注意的问题 netty内置的基本codec base64 by 柔光的暖阳◎/ 2022年09月06日 13:29/ 0 赞/ 333 阅读
相关 netty系列之:自定义编码和解码器要注意的问题 文章目录 简介 自定义编码器和解码器的实现 ReplayingDecoder 总结 简介 在之前的系列文章中,我们提到了netty中的ch ╰+攻爆jí腚メ/ 2022年09月05日 12:52/ 0 赞/ 271 阅读
相关 netty系列之:自定义编码解码器 文章目录 简介 自定义编码器 自定义解码器 添加编码解码器到pipeline 计算2的N次方 总结 简介 在之前的netty系 电玩女神/ 2022年09月05日 00:24/ 0 赞/ 288 阅读
相关 netty自定义编码器和解码器(粘包处理) 这里的实现方式是:将消息分为两部分,也就是消息头和消息尾,消息头中写入要发送数据的总长度,通常是在消息头的第一个字段使用int值来标识发送数据的长度。 首先我们写一个 爱被打了一巴掌/ 2022年05月13日 09:58/ 0 赞/ 287 阅读
相关 Netty——编解码器 什么是编解码器 每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和 目标应用程序的数据格式做相互转换。这种转换逻辑由编解码器处理,编解码器 比眉伴天荒/ 2021年08月30日 23:41/ 0 赞/ 647 阅读
还没有评论,来说两句吧...