由浅入深Netty简易实现RPC框架

叁歲伎倆 2024-03-17 09:13 180阅读 0赞

目录

    • 1 准备工作
    • 2 服务器 handler
    • 3 客户端代码第一版
    • 4 客户端 handler 第一版
    • 5 客户端代码 第二版
    • 6 客户端 handler 第二版

1 准备工作

在这里插入图片描述

这些代码可以认为是现成的,无需从头编写练习

为了简化起见,在原来聊天项目的基础上新增 Rpc 请求和响应消息

  1. @Data
  2. public abstract class Message implements Serializable {
  3. // 省略旧的代码
  4. public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
  5. public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
  6. static {
  7. // ...
  8. messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
  9. messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
  10. }
  11. }

请求消息

  1. @Getter
  2. @ToString(callSuper = true)
  3. public class RpcRequestMessage extends Message {
  4. /**
  5. * 调用的接口全限定名,服务端根据它找到实现
  6. */
  7. private String interfaceName;
  8. /**
  9. * 调用接口中的方法名
  10. */
  11. private String methodName;
  12. /**
  13. * 方法返回类型
  14. */
  15. private Class<?> returnType;
  16. /**
  17. * 方法参数类型数组
  18. */
  19. private Class[] parameterTypes;
  20. /**
  21. * 方法参数值数组
  22. */
  23. private Object[] parameterValue;
  24. public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
  25. super.setSequenceId(sequenceId);
  26. this.interfaceName = interfaceName;
  27. this.methodName = methodName;
  28. this.returnType = returnType;
  29. this.parameterTypes = parameterTypes;
  30. this.parameterValue = parameterValue;
  31. }
  32. @Override
  33. public int getMessageType() {
  34. return RPC_MESSAGE_TYPE_REQUEST;
  35. }
  36. }

响应消息

  1. @Data
  2. @ToString(callSuper = true)
  3. public class RpcResponseMessage extends Message {
  4. /**
  5. * 返回值
  6. */
  7. private Object returnValue;
  8. /**
  9. * 异常值
  10. */
  11. private Exception exceptionValue;
  12. @Override
  13. public int getMessageType() {
  14. return RPC_MESSAGE_TYPE_RESPONSE;
  15. }
  16. }

服务器架子

  1. @Slf4j
  2. public class RpcServer {
  3. public static void main(String[] args) {
  4. NioEventLoopGroup boss = new NioEventLoopGroup();
  5. NioEventLoopGroup worker = new NioEventLoopGroup();
  6. LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
  7. MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
  8. // rpc 请求消息处理器,待实现
  9. RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();
  10. try {
  11. ServerBootstrap serverBootstrap = new ServerBootstrap();
  12. serverBootstrap.channel(NioServerSocketChannel.class);
  13. serverBootstrap.group(boss, worker);
  14. serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
  15. @Override
  16. protected void initChannel(SocketChannel ch) throws Exception {
  17. ch.pipeline().addLast(new ProcotolFrameDecoder());
  18. ch.pipeline().addLast(LOGGING_HANDLER);
  19. ch.pipeline().addLast(MESSAGE_CODEC);
  20. ch.pipeline().addLast(RPC_HANDLER);
  21. }
  22. });
  23. Channel channel = serverBootstrap.bind(8080).sync().channel();
  24. channel.closeFuture().sync();
  25. } catch (InterruptedException e) {
  26. log.error("server error", e);
  27. } finally {
  28. boss.shutdownGracefully();
  29. worker.shutdownGracefully();
  30. }
  31. }
  32. }

客户端架子

  1. public class RpcClient {
  2. public static void main(String[] args) {
  3. NioEventLoopGroup group = new NioEventLoopGroup();
  4. LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
  5. MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
  6. // rpc 响应消息处理器,待实现
  7. RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
  8. try {
  9. Bootstrap bootstrap = new Bootstrap();
  10. bootstrap.channel(NioSocketChannel.class);
  11. bootstrap.group(group);
  12. bootstrap.handler(new ChannelInitializer<SocketChannel>() {
  13. @Override
  14. protected void initChannel(SocketChannel ch) throws Exception {
  15. ch.pipeline().addLast(new ProcotolFrameDecoder());
  16. ch.pipeline().addLast(LOGGING_HANDLER);
  17. ch.pipeline().addLast(MESSAGE_CODEC);
  18. ch.pipeline().addLast(RPC_HANDLER);
  19. }
  20. });
  21. Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
  22. channel.closeFuture().sync();
  23. } catch (Exception e) {
  24. log.error("client error", e);
  25. } finally {
  26. group.shutdownGracefully();
  27. }
  28. }
  29. }

服务器端的 service 获取

  1. public class ServicesFactory {
  2. static Properties properties;
  3. static Map<Class<?>, Object> map = new ConcurrentHashMap<>();
  4. static {
  5. try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
  6. properties = new Properties();
  7. properties.load(in);
  8. Set<String> names = properties.stringPropertyNames();
  9. for (String name : names) {
  10. if (name.endsWith("Service")) {
  11. Class<?> interfaceClass = Class.forName(name);
  12. Class<?> instanceClass = Class.forName(properties.getProperty(name));
  13. map.put(interfaceClass, instanceClass.newInstance());
  14. }
  15. }
  16. } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {
  17. throw new ExceptionInInitializerError(e);
  18. }
  19. }
  20. public static <T> T getService(Class<T> interfaceClass) {
  21. return (T) map.get(interfaceClass);
  22. }
  23. }

相关配置 application.properties

  1. serializer.algorithm=Json
  2. cn.itcast.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl

2 服务器 handler

  1. @Slf4j
  2. @ChannelHandler.Sharable
  3. public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
  4. @Override
  5. protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {
  6. RpcResponseMessage response = new RpcResponseMessage();
  7. response.setSequenceId(message.getSequenceId());
  8. try {
  9. // 获取真正的实现对象
  10. HelloService service = (HelloService)
  11. ServicesFactory.getService(Class.forName(message.getInterfaceName()));
  12. // 获取要调用的方法
  13. Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
  14. // 调用方法
  15. Object invoke = method.invoke(service, message.getParameterValue());
  16. // 调用成功
  17. response.setReturnValue(invoke);
  18. } catch (Exception e) {
  19. e.printStackTrace();
  20. // 调用异常
  21. response.setExceptionValue(e);
  22. }
  23. // 返回结果
  24. ctx.writeAndFlush(response);
  25. }
  26. }

3 客户端代码第一版

只发消息

  1. @Slf4j
  2. public class RpcClient {
  3. public static void main(String[] args) {
  4. NioEventLoopGroup group = new NioEventLoopGroup();
  5. LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
  6. MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
  7. RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
  8. try {
  9. Bootstrap bootstrap = new Bootstrap();
  10. bootstrap.channel(NioSocketChannel.class);
  11. bootstrap.group(group);
  12. bootstrap.handler(new ChannelInitializer<SocketChannel>() {
  13. @Override
  14. protected void initChannel(SocketChannel ch) throws Exception {
  15. ch.pipeline().addLast(new ProcotolFrameDecoder());
  16. ch.pipeline().addLast(LOGGING_HANDLER);
  17. ch.pipeline().addLast(MESSAGE_CODEC);
  18. ch.pipeline().addLast(RPC_HANDLER);
  19. }
  20. });
  21. Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
  22. ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(
  23. 1,
  24. "cn.itcast.server.service.HelloService",
  25. "sayHello",
  26. String.class,
  27. new Class[]{
  28. String.class},
  29. new Object[]{
  30. "张三"}
  31. )).addListener(promise -> {
  32. if (!promise.isSuccess()) {
  33. Throwable cause = promise.cause();
  34. log.error("error", cause);
  35. }
  36. });
  37. channel.closeFuture().sync();
  38. } catch (Exception e) {
  39. log.error("client error", e);
  40. } finally {
  41. group.shutdownGracefully();
  42. }
  43. }
  44. }

4 客户端 handler 第一版

  1. @Slf4j
  2. @ChannelHandler.Sharable
  3. public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
  4. @Override
  5. protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
  6. log.debug("{}", msg);
  7. }
  8. }

5 客户端代码 第二版

包括 channel 管理,代理,接收结果

  1. @Slf4j
  2. public class RpcClientManager {
  3. public static void main(String[] args) {
  4. HelloService service = getProxyService(HelloService.class);
  5. System.out.println(service.sayHello("zhangsan"));
  6. // System.out.println(service.sayHello("lisi"));
  7. // System.out.println(service.sayHello("wangwu"));
  8. }
  9. // 创建代理类
  10. public static <T> T getProxyService(Class<T> serviceClass) {
  11. ClassLoader loader = serviceClass.getClassLoader();
  12. Class<?>[] interfaces = new Class[]{
  13. serviceClass};
  14. // sayHello "张三"
  15. Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {
  16. // 1. 将方法调用转换为 消息对象
  17. int sequenceId = SequenceIdGenerator.nextId();
  18. RpcRequestMessage msg = new RpcRequestMessage(
  19. sequenceId,
  20. serviceClass.getName(),
  21. method.getName(),
  22. method.getReturnType(),
  23. method.getParameterTypes(),
  24. args
  25. );
  26. // 2. 将消息对象发送出去
  27. getChannel().writeAndFlush(msg);
  28. // 3. 准备一个空 Promise 对象,来接收结果 指定 promise 对象异步接收结果线程
  29. DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
  30. RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);
  31. // promise.addListener(future -> {
  32. // // 线程
  33. // });
  34. // 4. 等待 promise 结果
  35. promise.await();
  36. if(promise.isSuccess()) {
  37. // 调用正常
  38. return promise.getNow();
  39. } else {
  40. // 调用失败
  41. throw new RuntimeException(promise.cause());
  42. }
  43. });
  44. return (T) o;
  45. }
  46. private static Channel channel = null;
  47. private static final Object LOCK = new Object();
  48. // 获取唯一的 channel 对象
  49. public static Channel getChannel() {
  50. if (channel != null) {
  51. return channel;
  52. }
  53. synchronized (LOCK) {
  54. // t2
  55. if (channel != null) {
  56. // t1
  57. return channel;
  58. }
  59. initChannel();
  60. return channel;
  61. }
  62. }
  63. // 初始化 channel 方法
  64. private static void initChannel() {
  65. NioEventLoopGroup group = new NioEventLoopGroup();
  66. LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
  67. MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
  68. RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
  69. Bootstrap bootstrap = new Bootstrap();
  70. bootstrap.channel(NioSocketChannel.class);
  71. bootstrap.group(group);
  72. bootstrap.handler(new ChannelInitializer<SocketChannel>() {
  73. @Override
  74. protected void initChannel(SocketChannel ch) throws Exception {
  75. ch.pipeline().addLast(new ProcotolFrameDecoder());
  76. ch.pipeline().addLast(LOGGING_HANDLER);
  77. ch.pipeline().addLast(MESSAGE_CODEC);
  78. ch.pipeline().addLast(RPC_HANDLER);
  79. }
  80. });
  81. try {
  82. channel = bootstrap.connect("localhost", 8080).sync().channel();
  83. channel.closeFuture().addListener(future -> {
  84. group.shutdownGracefully();
  85. });
  86. } catch (Exception e) {
  87. log.error("client error", e);
  88. }
  89. }
  90. }

6 客户端 handler 第二版

  1. @Slf4j
  2. @ChannelHandler.Sharable
  3. public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
  4. // 序号 用来接收结果的 promise 对象
  5. public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();
  6. @Override
  7. protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
  8. log.debug("{}", msg);
  9. // 拿到空的 promise
  10. Promise<Object> promise = PROMISES.remove(msg.getSequenceId());
  11. if (promise != null) {
  12. Object returnValue = msg.getReturnValue();
  13. Exception exceptionValue = msg.getExceptionValue();
  14. if(exceptionValue != null) {
  15. promise.setFailure(exceptionValue);
  16. } else {
  17. promise.setSuccess(returnValue);
  18. }
  19. }
  20. }
  21. }

发表评论

表情:
评论列表 (有 0 条评论,180人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Netty高性能的RPC框架

    Netty是一个开源的、高性能的、异步事件驱动的网络通信框架,支持多种协议和编码解码器,能够帮助开发人员快速构建高性能、可扩展的网络应用程序。它的主要优势包括: 1. 异步

    相关 手写RPC框架(netty+zookeeper)

      RPC是什么?远程过程调用,过程就是业务处理、计算任务,像调用本地方法一样调用远程的过程。   RMI和RPC的区别是什么?RMI是远程方法调用,是oop领域中RPC的一

    相关 基于Netty实现简单的RPC框架

            Dubbo采用Netty作为基础通信组件,模仿Dubbo实现简单版的RPC框架。服务消费者和服务提供者约定接口和协议,用于远程调用和TCP请求验证。服务提供者作