《序列化与自定义Request、Response编解码器》

「爱情、让人受尽委屈。」 2022-02-23 02:50 499阅读 0赞

《序列化与自定义Request、Response编解码器》

  • 序列化是如何实现的
    • 序列化三种底层实现方式
      • 使用JDK的ByteArrayOutputStream序列化(需要自己手写大小端转字节序列的函数)
      • 用NIO中的Bytebuf(不能自动扩容)
      • 使用Netty中的ChannelBuffer
    • 序列化对象
      • 使用继承Serializer抽象类
      • 采用Protobuf序列化框架
  • 序列化协议在编解码器中的应用
    • 如何自定义写一个编解码器应用在数据传输中
      • 先实现数据包
      • Request的编解码器
        • RequestEncoder
        • RequestDecoder
      • response编解码器
        • ResponseEncoder
        • ResponseDecoder
      • 测试自定义编解码器

序列化是如何实现的

对象的序列化主要有两种用途:

1) 把对象的字节序列永久地保存到硬盘上,通常存放在一个文件中;

2) 在网络上传送对象的字节序列。

  在很多应用中,需要对某些对象进行序列化,让它们离开内存空间,入住物理硬盘,以便长期保存。比如最常见的是Web服务器中的Session对象,当有 10万用户并发访问,就有可能出现10万个Session对象,内存可能吃不消,于是Web容器就会把一些seesion先序列化到硬盘中,等要用了,再把保存在硬盘中的对象还原到内存中。
  当两个进程在进行远程通信时,彼此可以发送各种类型的数据。无论是何种类型的数据,都会以二进制序列的形式在网络上传送。发送方需要把这个Java对象转换为字节序列,才能在网络上传送;接收方则需要把字节序列再恢复为Java对象。
  java序列化是java编解码技术中的一种,不同的编解码的优缺点,参考之前的博客。https://blog.csdn.net/weixin_41262453/article/details/88980701

序列化三种底层实现方式

使用JDK的ByteArrayOutputStream序列化(需要自己手写大小端转字节序列的函数)

  ByteArrayOutputStream类实现了将数据写入字节数组的输出流当数据写入缓冲区时,缓冲区会自动增长。ByteArrayOutputStream.write(int b) 将指定的字节写入此字节数组输出流。 此方式需要自己手写大小端转字节序列的函数,麻烦。

  1. public class Test1 {
  2. public static void main(String[] args) throws IOException {
  3. int id = 101;
  4. int age = 21;
  5. ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
  6. arrayOutputStream.write(int2bytes(id));
  7. arrayOutputStream.write(int2bytes(age));
  8. byte[] byteArray = arrayOutputStream.toByteArray();
  9. System.out.println(Arrays.toString(byteArray));
  10. ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(byteArray);
  11. byte[] idBytes = new byte[4];
  12. arrayInputStream.read(idBytes);
  13. System.out.println("id:" + bytes2int(idBytes));
  14. byte[] ageBytes = new byte[4];
  15. arrayInputStream.read(ageBytes);
  16. System.out.println("age:" + bytes2int(ageBytes));
  17. }
  18. /** * 大端字节序列(先写高位,再写低位) * 百度下 大小端字节序列 * @param i * @return */
  19. public static byte[] int2bytes(int i){
  20. byte[] bytes = new byte[4];
  21. bytes[0] = (byte)(i >> 3*8);
  22. bytes[1] = (byte)(i >> 2*8);
  23. bytes[2] = (byte)(i >> 1*8);
  24. bytes[3] = (byte)(i >> 0*8);
  25. return bytes;
  26. }
  27. /** * 大端 * @param bytes * @return */
  28. public static int bytes2int(byte[] bytes){
  29. return (bytes[0] << 3*8) |
  30. (bytes[1] << 2*8) |
  31. (bytes[2] << 1*8) |
  32. (bytes[3] << 0*8);
  33. }
  34. }

运行结果:
                在这里插入图片描述

用NIO中的Bytebuf(不能自动扩容)

  使用NIO中的字节缓冲区类Bytebuf,提供了许多序列化方法。使用方便,但是在使用时要定义大小,不能自动扩容。

  1. public class Test2 {
  2. public static void main(String[] args) {
  3. int id = 101;
  4. int age = 21;
  5. ByteBuffer buffer = ByteBuffer.allocate(8);
  6. buffer.putInt(id);
  7. buffer.putInt(age);
  8. byte[] array = buffer.array();
  9. System.out.println(Arrays.toString(buffer.array()));
  10. //反序列化
  11. ByteBuffer buffer2 = ByteBuffer.wrap(array);
  12. System.out.println("id:"+buffer2.getInt());
  13. System.out.println("age:"+buffer2.getInt());
  14. }
  15. }

运行结果:
           在这里插入图片描述

使用Netty中的ChannelBuffer

  使用时需要导入Netty的jar包。

  1. public class Test3 {
  2. public static void main(String[] args) {
  3. ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
  4. buffer.writeInt(101);
  5. buffer.writeDouble(80.1);
  6. byte[] bytes = new byte[buffer.writerIndex()];
  7. buffer.readBytes(bytes);
  8. System.out.println(Arrays.toString(bytes));
  9. "abc".getBytes();
  10. //=====================反序列化===========================
  11. ChannelBuffer wrappedBuffer = ChannelBuffers.wrappedBuffer(bytes);
  12. System.out.println(wrappedBuffer.readInt());
  13. System.out.println(wrappedBuffer.readDouble());
  14. }
  15. }

运行结果:
          在这里插入图片描述

序列化对象

使用继承Serializer抽象类

  Serializer底层的实现就是两个ChannelBuffer, writeBuffer和readBuffer。
  把对象转换为字节序列的过程称为对象的序列化;把字节序列恢复为对象的过程称为对象的反序列化。需要序列化的两个类:Player 和Resource ,继承Serializer抽象类要做的事情就是重写write和read方法。

  1. public class Player extends Serializer{
  2. private long playerId;
  3. private int age;
  4. private List<Integer> skills = new ArrayList<>();
  5. private Resource resource = new Resource();
  6. public Resource getResource() {
  7. return resource;
  8. }
  9. public void setResource(Resource resource) {
  10. this.resource = resource;
  11. }
  12. public long getPlayerId() {
  13. return playerId;
  14. }
  15. public void setPlayerId(long playerId) {
  16. this.playerId = playerId;
  17. }
  18. public int getAge() {
  19. return age;
  20. }
  21. public void setAge(int age) {
  22. this.age = age;
  23. }
  24. public List<Integer> getSkills() {
  25. return skills;
  26. }
  27. public void setSkills(List<Integer> skills) {
  28. this.skills = skills;
  29. }
  30. @Override
  31. protected void read() {
  32. this.playerId = readLong();
  33. this.age = readInt();
  34. this.skills = readList(Integer.class);
  35. this.resource = read(Resource.class);
  36. }
  37. @Override
  38. protected void write() {
  39. writeLong(playerId);
  40. writeInt(age);
  41. writeList(skills);
  42. writeObject(resource);
  43. }
  44. }
  45. public class Resource extends Serializer {
  46. private int gold;
  47. public int getGold() {
  48. return gold;
  49. }
  50. public void setGold(int gold) {
  51. this.gold = gold;
  52. }
  53. @Override
  54. protected void read() {
  55. this.gold = readInt();
  56. }
  57. @Override
  58. protected void write() {
  59. writeInt(gold);
  60. }
  61. }

测试序列化对象:

  1. public class Test4 {
  2. public static void main(String[] args) {
  3. Player player = new Player();
  4. player.setPlayerId(10001);
  5. player.setAge(22);
  6. player.getSkills().add(101);
  7. player.getResource().setGold(99999);
  8. byte[] bytes = player.getBytes();
  9. System.out.println(Arrays.toString(bytes));
  10. //==============================================
  11. Player player2 = new Player();
  12. player2.readFromBytes(bytes);
  13. System.out.println(player2.getPlayerId() + " "+player2.getAge() + " "+ Arrays.toString(player2.getSkills().toArray())+" " +player2.getResource().getGold());
  14. }
  15. }

运行结果:
在这里插入图片描述

采用Protobuf序列化框架

  Protobuf系列化操作参考https://blog.csdn.net/weixin\_41262453/article/details/88980701\#Google\_Protobuf\_451 ,将上面的player序列化,进行测试。
  player.proto配置文件参考
              在这里插入图片描述
序列化后得到的PlayerModule有1000多行就不放了,测试代码如下:

  1. import java.util.Arrays;
  2. import com.proto.PlayerModule.PBPlayer;
  3. import com.proto.PlayerModule.PBPlayer.Builder;
  4. public class PB2Bytes {
  5. public static void main(String[] args) throws Exception {
  6. byte[] bytes = toBytes();
  7. toPlayer(bytes);
  8. }
  9. /** * 序列化 */
  10. public static byte[] toBytes(){
  11. //获取一个PBPlayer的构造器
  12. Builder builder = PlayerModule.PBPlayer.newBuilder();
  13. //设置数据
  14. builder.setPlayerId(101).setAge(20).setName("peter").addSkills(1001);
  15. //构造出对象
  16. PBPlayer player = builder.build();
  17. //序列化成字节数组
  18. byte[] byteArray = player.toByteArray();
  19. System.out.println(Arrays.toString(byteArray));
  20. return byteArray;
  21. }
  22. /** * 反序列化 * @param bs * @throws Exception */
  23. public static void toPlayer(byte[] bs) throws Exception{
  24. PBPlayer player = PlayerModule.PBPlayer.parseFrom(bs);
  25. System.out.println("playerId:" + player.getPlayerId());
  26. System.out.println("age:" + player.getAge());
  27. System.out.println("name:" + player.getName());
  28. System.out.println("skills:" + (Arrays.toString(player.getSkillsList().toArray())));
  29. }
  30. }

运行结果:
        在这里插入图片描述

序列化协议在编解码器中的应用

  此处提供了一个自定义的编解码器用于传输我们已经序列化的对象,可用于了解常见通信框架的编解码器底层序列化工作原理。

如何自定义写一个编解码器应用在数据传输中

先实现数据包

  首先设计数据包的格式:

  1. * 数据包格式
  2. * +——----——+——-----——+——----——+——----——+——-----——+
  3. * | 包头 | 模块号 | 命令号 | 长度 | 数据 |
  4. * +——----——+——-----——+——----——+——----——+——-----——+
  5. * </pre>
  6. * 包头4字节
  7. * 模块号2字节short
  8. * 命令号2字节short
  9. * 长度4字节(描述数据部分字节长度)

  因此我们要定义一个包头,首先采用一个不常用的4字节数据作为包头:

  1. public interface ConstantValue {
  2. /** * 包头 */
  3. public static final int FLAG = -32523523;
  4. }

  其次定义请求,请求中包括请求模块、命令号、数据

  1. public class Request {
  2. /** * 请求模块 */
  3. private short module;
  4. /** * 命令号 */
  5. private short cmd;
  6. /** * 数据部分 */
  7. private byte[] data;
  8. public short getModule() {
  9. return module;
  10. }
  11. public void setModule(short module) {
  12. this.module = module;
  13. }
  14. public short getCmd() {
  15. return cmd;
  16. }
  17. public void setCmd(short cmd) {
  18. this.cmd = cmd;
  19. }
  20. public byte[] getData() {
  21. return data;
  22. }
  23. public void setData(byte[] data) {
  24. this.data = data;
  25. }
  26. public int getDataLength(){
  27. if(data == null){
  28. return 0;
  29. }
  30. return data.length;
  31. }
  32. }

响应的有响应:Response

  1. public class Response {
  2. /** * 请求模块 */
  3. private short module;
  4. /** * 命令号 */
  5. private short cmd;
  6. /** * 状态码 */
  7. private int stateCode;
  8. /** * 数据部分 */
  9. private byte[] data;
  10. public short getModule() {
  11. return module;
  12. }
  13. public void setModule(short module) {
  14. this.module = module;
  15. }
  16. public short getCmd() {
  17. return cmd;
  18. }
  19. public void setCmd(short cmd) {
  20. this.cmd = cmd;
  21. }
  22. public int getStateCode() {
  23. return stateCode;
  24. }
  25. public void setStateCode(int stateCode) {
  26. this.stateCode = stateCode;
  27. }
  28. public byte[] getData() {
  29. return data;
  30. }
  31. public void setData(byte[] data) {
  32. this.data = data;
  33. }
  34. public int getDataLength(){
  35. if(data == null){
  36. return 0;
  37. }
  38. return data.length;
  39. }
  40. }

返回的状态字:

  1. public interface StateCode {
  2. /** * 成功 */
  3. public static int SUCCESS = 0;
  4. /** * 失败 */
  5. public static int FAIL = 1;
  6. }

Request的编解码器

RequestEncoder

  RequestEncoder首先需要继承Netty3.10.5中OneToOneEncoder,RequestEncoder将请求Request对象实现序列化到ChannelBuffer ,返回ChannelBuffer 缓存区buffer。

  1. /** * 请求编码器 * <pre> * 数据包格式 * +——----——+——-----——+——----——+——----——+——-----——+ * | 包头 | 模块号 | 命令号 | 长度 | 数据 | * +——----——+——-----——+——----——+——----——+——-----——+ * </pre> * 包头4字节 * 模块号2字节short * 命令号2字节short * 长度4字节(描述数据部分字节长度) */
  2. public class RequestEncoder extends OneToOneEncoder{
  3. @Override
  4. protected Object encode(ChannelHandlerContext context, Channel channel, Object rs) throws Exception {
  5. Request request = (Request)(rs);
  6. ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
  7. //包头
  8. buffer.writeInt(ConstantValue.FLAG);
  9. //module
  10. buffer.writeShort(request.getModule());
  11. //cmd
  12. buffer.writeShort(request.getCmd());
  13. //长度
  14. buffer.writeInt(request.getDataLength());
  15. //data
  16. if(request.getData() != null){
  17. buffer.writeBytes(request.getData());
  18. }
  19. return buffer;
  20. }
  21. }

RequestDecoder

  RequestDecoder同样要继承Netty3.10.5中FrameDecoder,Request解码器要做的事情就是从接收到的字节流缓存区读取出数据,进行判断,若包头正确,可读长度大于基本长度,则将字节流依次读取出Request类的各个成员变量,返回Request对象。

  1. /** * 请求解码器 * <pre> * 数据包格式 * +——----——+——-----——+——----——+——----——+——-----——+ * | 包头 | 模块号 | 命令号 | 长度 | 数据 | * +——----——+——-----——+——----——+——----——+——-----——+ * </pre> * 包头4字节 * 模块号2字节short * 命令号2字节short * 长度4字节(描述数据部分字节长度) * * */
  2. public class RequestDecoder extends FrameDecoder{
  3. /** * 数据包基本长度 */
  4. public static int BASE_LENTH = 4 + 2 + 2 + 4;
  5. @Override
  6. protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer) throws Exception {
  7. //可读长度必须大于基本长度
  8. if(buffer.readableBytes() >= BASE_LENTH){
  9. //防止socket字节流攻击
  10. if(buffer.readableBytes() > 2048){
  11. buffer.skipBytes(buffer.readableBytes());
  12. }
  13. //记录包头开始的index
  14. int beginReader;
  15. while(true){
  16. beginReader = buffer.readerIndex();
  17. buffer.markReaderIndex();
  18. if(buffer.readInt() == ConstantValue.FLAG){
  19. break;
  20. }
  21. //未读到包头,略过一个字节
  22. buffer.resetReaderIndex();
  23. buffer.readByte();
  24. //长度又变得不满足
  25. if(buffer.readableBytes() < BASE_LENTH){
  26. return null;
  27. }
  28. }
  29. //模块号
  30. short module = buffer.readShort();
  31. //命令号
  32. short cmd = buffer.readShort();
  33. //长度
  34. int length = buffer.readInt();
  35. //判断请求数据包数据是否到齐
  36. if(buffer.readableBytes() < length){
  37. //还原读指针
  38. buffer.readerIndex(beginReader);
  39. return null;
  40. }
  41. //读取data数据
  42. byte[] data = new byte[length];
  43. buffer.readBytes(data);
  44. Request request = new Request();
  45. request.setModule(module);
  46. request.setCmd(cmd);
  47. request.setData(data);
  48. //继续往下传递
  49. return request;
  50. }
  51. //数据包不完整,需要等待后面的包来
  52. return null;
  53. }
  54. }

response编解码器

ResponseEncoder

  ResponseEncoder与RequestEncoder实现原理类似。

  1. /** * 请求编码器 * <pre> * 数据包格式 * +——----——+——-----——+——----——+——----——+——-----——+——-----——+ * | 包头 | 模块号 | 命令号 | 状态码 | 长度 | 数据 | * +——----——+——-----——+——----——+——----——+——-----——+——-----——+ * </pre> * 包头4字节 * 模块号2字节short * 命令号2字节short * 长度4字节(描述数据部分字节长度) */
  2. public class ResponseEncoder extends OneToOneEncoder{
  3. @Override
  4. protected Object encode(ChannelHandlerContext context, Channel channel, Object rs) throws Exception {
  5. Response response = (Response)(rs);
  6. ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
  7. //包头
  8. buffer.writeInt(ConstantValue.FLAG);
  9. //module
  10. buffer.writeShort(response.getModule());
  11. //cmd
  12. buffer.writeShort(response.getCmd());
  13. //状态码
  14. buffer.writeInt(response.getStateCode());
  15. //长度
  16. buffer.writeInt(response.getDataLength());
  17. //data
  18. if(response.getData() != null){
  19. buffer.writeBytes(response.getData());
  20. }
  21. return buffer;
  22. }
  23. }

ResponseDecoder

  ResponseDecoder相比RequestDecoder,还会多解析一个状态码。

  1. /** * response解码器 * <pre> * 数据包格式 * +——----——+——-----——+——----——+——----——+——-----——+——-----——+ * | 包头 | 模块号 | 命令号 | 状态码 | 长度 | 数据 | * +——----——+——-----——+——----——+——----——+——-----——+——-----——+ * </pre> * 包头4字节 * 模块号2字节short * 命令号2字节short * 长度4字节(描述数据部分字节长度) */
  2. public class ResponseDecoder extends FrameDecoder{
  3. /** * 数据包基本长度 */
  4. public static int BASE_LENTH = 4 + 2 + 2 + 4;
  5. @Override
  6. protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer) throws Exception {
  7. //可读长度必须大于基本长度
  8. if(buffer.readableBytes() >= BASE_LENTH){
  9. //记录包头开始的index
  10. int beginReader = buffer.readerIndex();
  11. while(true){
  12. if(buffer.readInt() == ConstantValue.FLAG){
  13. break;
  14. }
  15. }
  16. //模块号
  17. short module = buffer.readShort();
  18. //命令号
  19. short cmd = buffer.readShort();
  20. //状态码
  21. int stateCode = buffer.readInt();
  22. //长度
  23. int length = buffer.readInt();
  24. if(buffer.readableBytes() < length){
  25. //还原读指针
  26. buffer.readerIndex(beginReader);
  27. return null;
  28. }
  29. byte[] data = new byte[length];
  30. buffer.readBytes(data);
  31. Response response = new Response();
  32. response.setModule(module);
  33. response.setCmd(cmd);
  34. response.setStateCode(stateCode);
  35. response.setData(data);
  36. //继续往下传递
  37. return response;
  38. }
  39. //数据包不完整,需要等待后面的包来
  40. return null;
  41. }
  42. }

测试自定义编解码器

  发送的对象模型:FightRequest

  1. public class FightRequest extends Serializer{
  2. /** * 副本id */
  3. private int fubenId;
  4. /** * 次数 */
  5. private int count;
  6. public int getFubenId() {
  7. return fubenId;
  8. }
  9. public void setFubenId(int fubenId) {
  10. this.fubenId = fubenId;
  11. }
  12. public int getCount() {
  13. return count;
  14. }
  15. public void setCount(int count) {
  16. this.count = count;
  17. }
  18. @Override
  19. protected void read() {
  20. this.fubenId = readInt();
  21. this.count = readInt();
  22. }
  23. @Override
  24. protected void write() {
  25. writeInt(fubenId);
  26. writeInt(count);
  27. }
  28. }

  返回的对象模型:

  1. public class FightResponse extends Serializer{
  2. /** * 获取金币 */
  3. private int gold;
  4. public int getGold() {
  5. return gold;
  6. }
  7. public void setGold(int gold) {
  8. this.gold = gold;
  9. }
  10. @Override
  11. protected void read() {
  12. this.gold = readInt();
  13. }
  14. @Override
  15. protected void write() {
  16. writeInt(gold);
  17. }
  18. }

  测试代码:服务端Client和HiHandler ,由于Request和Response的Encoder和Decoder都是继承了Netty中的类进行设计的,所以可以直接pipeline.addLast,因此兼容原有的Netty框架。并且在channel.write(request)时也是直接write request对象,request对象中的data就是需要发送的fightRequest模型的序列化字节码。

  1. public class Client {
  2. public static void main(String[] args) throws InterruptedException {
  3. //服务类
  4. ClientBootstrap bootstrap = new ClientBootstrap();
  5. //线程池
  6. ExecutorService boss = Executors.newCachedThreadPool();
  7. ExecutorService worker = Executors.newCachedThreadPool();
  8. //socket工厂
  9. bootstrap.setFactory(new NioClientSocketChannelFactory(boss, worker));
  10. //管道工厂
  11. bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
  12. @Override
  13. public ChannelPipeline getPipeline() throws Exception {
  14. ChannelPipeline pipeline = Channels.pipeline();
  15. pipeline.addLast("decoder", new ResponseDecoder());
  16. pipeline.addLast("encoder", new RequestEncoder());
  17. pipeline.addLast("hiHandler", new HiHandler());
  18. return pipeline;
  19. }
  20. });
  21. //连接服务端
  22. ChannelFuture connect = bootstrap.connect(new InetSocketAddress("127.0.0.1", 30000));
  23. Channel channel = connect.sync().getChannel();
  24. System.out.println("client start");
  25. Scanner scanner = new Scanner(System.in);
  26. while(true){
  27. System.out.println("请输入");
  28. int fubenId = Integer.parseInt(scanner.nextLine());
  29. int count = Integer.parseInt(scanner.nextLine());
  30. FightRequest fightRequest = new FightRequest();
  31. fightRequest.setFubenId(fubenId);
  32. fightRequest.setCount(count);
  33. Request request = new Request();
  34. request.setModule((short) 1);
  35. request.setCmd((short) 1);
  36. request.setData(fightRequest.getBytes());
  37. //发送请求
  38. channel.write(request);
  39. }
  40. }
  41. }
  42. public class HiHandler extends SimpleChannelHandler {
  43. /** * 接收消息 */
  44. @Override
  45. public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
  46. Response message = (Response)e.getMessage();
  47. if(message.getModule() == 1){
  48. if(message.getCmd() == 1){
  49. FightResponse fightResponse = new FightResponse();
  50. fightResponse.readFromBytes(message.getData());
  51. System.out.println("gold:" + fightResponse.getGold());
  52. }else if(message.getCmd() == 2){
  53. }
  54. }else if (message.getModule() == 1){
  55. }
  56. }
  57. /** * 捕获异常 */
  58. @Override
  59. public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
  60. System.out.println("exceptionCaught");
  61. super.exceptionCaught(ctx, e);
  62. }
  63. /** * 新连接 */
  64. @Override
  65. public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
  66. System.out.println("channelConnected");
  67. super.channelConnected(ctx, e);
  68. }
  69. /** * 必须是链接已经建立,关闭通道的时候才会触发 */
  70. @Override
  71. public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
  72. System.out.println("channelDisconnected");
  73. super.channelDisconnected(ctx, e);
  74. }
  75. /** * channel关闭的时候触发 */
  76. @Override
  77. public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
  78. System.out.println("channelClosed");
  79. super.channelClosed(ctx, e);
  80. }
  81. }

  客户端Server 和HelloHandler:messageReceived中接收的消息此时就是Request对象,从Request对象的data取出的数据是序列化的fightRequest 字节码,要再经过该类中的readFromBytes方法转换,才能完全转成发送过来的fightRequest 。

  1. public class Server {
  2. public static void main(String[] args) {
  3. //服务类
  4. ServerBootstrap bootstrap = new ServerBootstrap();
  5. //boss线程监听端口,worker线程负责数据读写
  6. ExecutorService boss = Executors.newCachedThreadPool();
  7. ExecutorService worker = Executors.newCachedThreadPool();
  8. //设置niosocket工厂
  9. bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
  10. //设置管道的工厂
  11. bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
  12. @Override
  13. public ChannelPipeline getPipeline() throws Exception {
  14. ChannelPipeline pipeline = Channels.pipeline();
  15. pipeline.addLast("decoder", new RequestDecoder());
  16. pipeline.addLast("encoder", new ResponseEncoder());
  17. pipeline.addLast("helloHandler", new HelloHandler());
  18. return pipeline;
  19. }
  20. });
  21. bootstrap.bind(new InetSocketAddress(30000));
  22. System.out.println("start!!!");
  23. }
  24. }
  25. public class HelloHandler extends SimpleChannelHandler {
  26. /** * 接收消息 */
  27. @Override
  28. public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
  29. Request message = (Request)e.getMessage();
  30. if(message.getModule() == 1){
  31. if(message.getCmd() == 1){
  32. FightRequest fightRequest = new FightRequest();
  33. fightRequest.readFromBytes(message.getData());
  34. System.out.println("fubenId:" +fightRequest.getFubenId() + " " + "count:" + fightRequest.getCount());
  35. //回写数据
  36. FightResponse fightResponse = new FightResponse();
  37. fightResponse.setGold(9999);
  38. Response response = new Response();
  39. response.setModule((short) 1);
  40. response.setCmd((short) 1);
  41. response.setStateCode(StateCode.SUCCESS);
  42. response.setData(fightResponse.getBytes());
  43. ctx.getChannel().write(response);
  44. }else if(message.getCmd() == 2){
  45. }
  46. }else if (message.getModule() == 1){
  47. }
  48. }
  49. /** * 捕获异常 */
  50. @Override
  51. public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
  52. System.out.println("exceptionCaught");
  53. super.exceptionCaught(ctx, e);
  54. }
  55. /** * 新连接 */
  56. @Override
  57. public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
  58. System.out.println("channelConnected");
  59. super.channelConnected(ctx, e);
  60. }
  61. /** * 必须是链接已经建立,关闭通道的时候才会触发 */
  62. @Override
  63. public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
  64. System.out.println("channelDisconnected");
  65. super.channelDisconnected(ctx, e);
  66. }
  67. /** * channel关闭的时候触发 */
  68. @Override
  69. public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
  70. System.out.println("channelClosed");
  71. super.channelClosed(ctx, e);
  72. }
  73. }

运行测试:
              在这里插入图片描述
服务器端:
              在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,499人围观)

还没有评论,来说两句吧...

相关阅读