【Netty】七、Netty自定义编码和解码器

浅浅的花香味﹌ 2023-10-09 15:07 64阅读 0赞

Netty自定义编码和解码

  • 一、Netty自定义编码和解码)
  • 服务端代码
    • ProtocolServer
    • ProtocolServerHandler
  • 客户端
    • ProtocolClient
    • ProtocolClientHandler
  • 编解码器
    • RpcDecoder 、RpcEncoder

一、Netty自定义编码和解码)

编解码器的作用就是讲原始字节数据与自定义的消息对象进行互转,网络中都是以字节码的数据形式来传输数据的,服务器编码数据后发送到客户端,客户端需要对数据进行解码,因为编解码器由两部分组成:

  • Decoder(解码器)
  • Encoder(编码器)
    自定义编码器和解码器,可以参考netty里面内置的一些编码器和解码器的代码,比如:
    StringDecoder、StringEncoder
    ObjectDecoder、ObjectEncoder
    如下的例子是采用Fastjson将一个RpcMessage 的java对象在网络中传输所进行的编码和解码,更多场景可以采用该思路进行;
  • 解码:

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {

    1. ByteBuf frame = in.retainedDuplicate();
    2. final String content = frame.toString(CharsetUtil.UTF_8);
    3. RpcMessage message = JSON.parseObject(content, RpcMessage.class);
    4. out.add(message);
    5. in.skipBytes(in.readableBytes());

    }

    编码:

    1. @Override
    2. protected void encode(ChannelHandlerContext ctx, RpcMessage msg, ByteBuf out) throws Exception {
    3. out.writeBytes(JSON.toJSONString(msg).getBytes(CharsetUtil.UTF_8));
    4. }

    服务端代码

    ProtocolServer

    1. /**
    2. * * Netty自定义编码和解码 服务端
    3. */
    4. public class ProtocolServer {
    5. public static void main(String[] args) {
    6. ProtocolServer server = new ProtocolServer();
    7. server.openSever(6666);
    8. }
    9. public void openSever(int port) {
    10. ServerBootstrap bootstrap = new ServerBootstrap();
    11. EventLoopGroup bootGroup = new NioEventLoopGroup(1); //connect \accept \read \write
    12. EventLoopGroup workGroup = new NioEventLoopGroup();
    13. bootstrap.group(bootGroup, workGroup);
    14. bootstrap.channel(NioServerSocketChannel.class);
    15. bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    16. @Override
    17. protected void initChannel(SocketChannel ch) throws Exception {
    18. ChannelPipeline pipeline = ch.pipeline();
    19. //解决粘包拆包
    20. pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4));
    21. pipeline.addLast(new LengthFieldPrepender(4));
    22. //自定义的编码和解码
    23. pipeline.addLast(new RpcDecoder());
    24. pipeline.addLast(new RpcEncoder());
    25. //业务处理handler
    26. pipeline.addLast(ProtocolServerHandler.INSTANCE);
    27. }
    28. });
    29. try {
    30. System.out.println("服务启动成功");
    31. ChannelFuture f = bootstrap.bind(port).sync();
    32. f.channel().closeFuture().sync();
    33. } catch (InterruptedException e) {
    34. e.printStackTrace();
    35. } finally {
    36. bootGroup.shutdownGracefully();
    37. workGroup.shutdownGracefully();
    38. }
    39. }
    40. }

    ProtocolServerHandler

    1. import com.mytest.protocol.codec.RpcMessage;
    2. import io.netty.channel.ChannelHandler;
    3. import io.netty.channel.ChannelHandlerContext;
    4. import io.netty.channel.ChannelInboundHandlerAdapter;
    5. @ChannelHandler.Sharable
    6. public class ProtocolServerHandler extends ChannelInboundHandlerAdapter {
    7. public static final ProtocolServerHandler INSTANCE = new ProtocolServerHandler();
    8. @Override
    9. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    10. System.out.println("服务端接收消息");
    11. if (msg instanceof RpcMessage) {
    12. RpcMessage rpcMessage = (RpcMessage) msg;
    13. System.out.println("服务端接收到的消息:" + rpcMessage);
    14. //ctx.write(msg);
    15. }
    16. }
    17. @Override
    18. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    19. ctx.flush();
    20. }
    21. @Override
    22. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    23. cause.printStackTrace();
    24. }
    25. }

    客户端

    ProtocolClient

    1. import com.mytest.protocol.codec.RpcDecoder;
    2. import com.mytest.protocol.codec.RpcEncoder;
    3. import com.mytest.protocol.handler.ProtocolClientHandler;
    4. import io.netty.bootstrap.Bootstrap;
    5. import io.netty.channel.ChannelFuture;
    6. import io.netty.channel.ChannelInitializer;
    7. import io.netty.channel.ChannelPipeline;
    8. import io.netty.channel.EventLoopGroup;
    9. import io.netty.channel.nio.NioEventLoopGroup;
    10. import io.netty.channel.socket.SocketChannel;
    11. import io.netty.channel.socket.nio.NioSocketChannel;
    12. import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    13. import io.netty.handler.codec.LengthFieldPrepender;
    14. import java.io.IOException;
    15. /**
    16. * Netty自定义编码和解码 客户端
    17. */
    18. public class ProtocolClient {
    19. public static void main(String[] args) throws IOException {
    20. ProtocolClient client = new ProtocolClient("127.0.0.1", 6666);
    21. }
    22. public ProtocolClient(String host, int port) {
    23. EventLoopGroup group = new NioEventLoopGroup(1);
    24. Bootstrap bootstrap = new Bootstrap();
    25. bootstrap.group(group);
    26. bootstrap.channel(NioSocketChannel.class);
    27. bootstrap.remoteAddress(host, port);
    28. bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    29. @Override
    30. protected void initChannel(SocketChannel ch) throws Exception {
    31. ChannelPipeline pipeline = ch.pipeline();
    32. //netty的粘包和拆包
    33. pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4));
    34. pipeline.addLast(new LengthFieldPrepender(4));
    35. //自定义的编码和解码
    36. pipeline.addLast(new RpcDecoder());
    37. pipeline.addLast(new RpcEncoder());
    38. //业务处理的handler
    39. pipeline.addLast(ProtocolClientHandler.INSTANCE);
    40. }
    41. });
    42. try {
    43. ChannelFuture f = bootstrap.connect().sync();
    44. f.channel().closeFuture().sync();
    45. } catch (Exception e) {
    46. e.printStackTrace();
    47. } finally {
    48. System.out.println("连接关闭,资源释放");
    49. group.shutdownGracefully();
    50. }
    51. }
    52. }

    ProtocolClientHandler

    1. import com.mytest.protocol.codec.RpcMessage;
    2. import com.mytest.protocol.codec.RpcMessageType;
    3. import io.netty.channel.ChannelHandler;
    4. import io.netty.channel.ChannelHandlerContext;
    5. import io.netty.channel.ChannelInboundHandlerAdapter;
    6. @ChannelHandler.Sharable
    7. public class ProtocolClientHandler extends ChannelInboundHandlerAdapter {
    8. public static final ProtocolClientHandler INSTANCE = new ProtocolClientHandler();
    9. @Override
    10. public void channelActive(ChannelHandlerContext ctx) throws Exception {
    11. //调用远程,传输一个java对象
    12. RpcMessage rpcMessage = new RpcMessage();
    13. rpcMessage.setContent("消息");
    14. rpcMessage.setSender("发送者");
    15. StringBuffer stringBuffer = new StringBuffer();
    16. for (int i=0; i<100; i++) {
    17. stringBuffer.append("这是一个消息字名字的");
    18. }
    19. rpcMessage.setReceiver(stringBuffer.toString());
    20. rpcMessage.setRpcMessageType(RpcMessageType.LOGIN);
    21. rpcMessage.setTime(System.currentTimeMillis());
    22. for (int i=0; i<20; i++) {
    23. ctx.writeAndFlush(rpcMessage);
    24. }
    25. System.out.println("启动后消息发送完毕");
    26. }
    27. @Override
    28. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    29. System.out.println("客户端接收");
    30. if (msg instanceof RpcMessage) {
    31. RpcMessage rpcMessage = (RpcMessage) msg;
    32. System.out.println("客户端接收到的消息:" + rpcMessage);
    33. }
    34. }
    35. @Override
    36. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    37. ctx.flush();
    38. }
    39. @Override
    40. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    41. cause.printStackTrace();
    42. ctx.close();
    43. }
    44. }

    编解码器

    RpcDecoder 、RpcEncoder

    1. /**
    2. * 网络通信传输的 字节流数据 的解码器(字节流解码成程序能识别的数据(比如解码成一个java对象))
    3. *
    4. */
    5. public class RpcDecoder extends ByteToMessageDecoder {
    6. @Override
    7. protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
    8. //复制一份 字节流数据
    9. ByteBuf frame = byteBuf.retainedDuplicate();
    10. //把字节流数据转成字符串
    11. final String content = frame.toString(CharsetUtil.UTF_8);
    12. //把字符串通过fastjson转成java对象
    13. RpcMessage message = JSON.parseObject(content, RpcMessage.class);
    14. //把得到的java对象传给下一个handler
    15. out.add(message);
    16. //buf的数据已经读取过了,跳过已经读取的数据,更新一下buf的index下标
    17. byteBuf.skipBytes(byteBuf.readableBytes());
    18. }
    19. }
    20. import com.alibaba.fastjson.JSON;
    21. import io.netty.buffer.ByteBuf;
    22. import io.netty.channel.ChannelHandlerContext;
    23. import io.netty.handler.codec.MessageToByteEncoder;
    24. import io.netty.util.CharsetUtil;
    25. public class RpcEncoder extends MessageToByteEncoder<RpcMessage> {
    26. @Override
    27. protected void encode(ChannelHandlerContext ctx, RpcMessage msg, ByteBuf out) throws Exception {
    28. out.writeBytes(JSON.toJSONString(msg).getBytes(CharsetUtil.UTF_8));
    29. }
    30. }
    31. @Data
    32. public class RpcMessage {
    33. private RpcMessageType rpcMessageType; //消息类型[LOGIN]或者[SYSTEM]或者[LOGOUT]
    34. private long time; //消息发送时间
    35. private String sender; //发送人
    36. private String receiver; //接收人
    37. private Object content; //消息内容
    38. }
    39. public enum RpcMessageType {
    40. /**
    41. * 系统消息
    42. */
    43. SYSTEM("SYSTEM"),
    44. /**
    45. * 登录消息
    46. */
    47. LOGIN("LOGIN"),
    48. /**
    49. * 登出消息
    50. */
    51. LOGOUT("LOGOUT"),
    52. /**
    53. * 调用消息
    54. */
    55. INVOKE("INVOKE");
    56. private String name;
    57. public static boolean isRpcMessageType(String content) {
    58. return content.matches("^\\[(SYSTEM|LOGIN|LOGIN|INVOKE)\\]");
    59. }
    60. RpcMessageType(String name) {
    61. this.name = name;
    62. }
    63. public String getName() {
    64. return this.name;
    65. }
    66. public String toString() {
    67. return this.name;
    68. }
    69. }

发表评论

表情:
评论列表 (有 0 条评论,64人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Netty解码器

    Netty 编解码 在网络中数据是以字节码二进制的形式传输的,所以我们在使用 Netty 传输数据的时候,需要将我们传输的数据转换为 二进制的形式 进行传输,所以不管是我

    相关 Netty——编解码器

    什么是编解码器 每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和 目标应用程序的数据格式做相互转换。这种转换逻辑由编解码器处理,编解码器