基于Netty实现简单的RPC框架

本是古典 何须时尚 2022-08-29 10:28 379阅读 0赞

Netty实现RPC框架

关于 Netty 的线程模型,详见博客,传送地址:Netty线程模型

文章目录

  • Netty实现RPC框架
    • 一、RPC简介
    • 二、代码实现
        1. 需求介绍
        1. 实现步骤
        1. 公共代码
        1. 服务端代码
        1. 客户端代码

一、RPC简介

RPC全称为remote procedure call,即远程过程调用。借助RPC可以做到像本地调用一样调用远程服务,是一种进程间的通信方式。

比如两台服务器A和B,A服务器上部署一个应用,B服务器上部署一个应用,A服务器上的应用想调用B服务器上的应用提供的方法。由于两个应用不在一个内存空间,不能直接调用,所以需要通过网络来表达调用的语义和传达调用的数据。

注意:RPC并不是一个具体的技术,而是指整个网络远程调用过程。

image-20210713170847318

二、代码实现

1. 需求介绍

要求用Netty实现一个简单的RPC框架,消费者和提供者约定接口和协议,消费者远程调用提供者的服务。

2. 实现步骤

  1. 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
  2. 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
  3. 创建一个消费者,该类需要透明的调用自己不存在的提供者的方法,内部需要使用Netty进行数据通信。
  4. 提供者与消费者数据传输使用 json 字符串数据格式。
  5. 使用 Netty 集成 SpringBoot 环境实现。

image-20210713184235953

注意:服务端注解要使用在实现类上,用于标记服务是否对外暴露。

3. 公共代码

  1. 公共接口,消费者与提供者的约定

    public interface IUserService {

    1. /** * 根据ID查询用户 */
    2. User getById(int id);

    }

  2. 封装的请求对象

    @Data
    public class RpcRequest {

    1. /** * 请求对象的ID */
    2. private String requestId;
    3. /** * 要调用方法所在类名 */
    4. private String className;
    5. /** * 要调用的方法名 */
    6. private String methodName;
    7. /** * 要调用方法的参数类型 */
    8. private Class<?>[] parameterTypes;
    9. /** * 要调用方法的入参 */
    10. private Object[] parameters;

    }

  3. 封装的响应对象

    @Data
    public class RpcResponse {

    1. /** * 响应ID */
    2. private String requestId;
    3. /** * 错误信息 */
    4. private String error;
    5. /** * 返回的结果 */
    6. private Object result;

    }

  4. pojo类

    @Data
    public class User {

    1. private int id;
    2. private String name;

    }

4. 服务端代码

  1. 定义注解,用于暴露服务接口,暴露服务端的getUserById方法

    @Target(ElementType.TYPE) //用于类上
    @Retention(RetentionPolicy.RUNTIME) //在运行时可以获取到
    public @interface RpcService {
    }

  2. 服务端实现公共接口,客户端要调用的方法

    @RpcService //对外暴露,加自定义注解
    @Service
    public class UserServiceImpl implements IUserService {

    1. Map<Object, User> userMap = new HashMap();
    2. @Override
    3. public User getById(int id) {
    4. if (userMap.size() == 0) {
    5. User user1 = new User();
    6. user1.setId(1);
    7. user1.setName("张三");
    8. User user2 = new User();
    9. user2.setId(2);
    10. user2.setName("李四");
    11. userMap.put(user1.getId(), user1);
    12. userMap.put(user2.getId(), user2);
    13. }
    14. return userMap.get(id);
    15. }

    }

  3. 服务端代码

    /* Netty的服务端 启动服务端监听端口 实现Spring容器生命周期接口DisposableBean,在Spring容器关闭的时候本类中的资源也要关闭 */
    @Component
    public class NettyRpcServer implements DisposableBean {

    1. /** * 注入自定义的处理器 */
    2. @Autowired
    3. NettyServerHandler nettyServerHandler;
    4. // 定义boss和worker线程
    5. EventLoopGroup bossGroup = null;
    6. EventLoopGroup workerGroup = null;
    7. /** * 启动服务端,参数表示ip和端口号 */
    8. public void start(String host, int port) {
    9. try {
    10. //1.创建bossGroup和workerGroup
    11. bossGroup = new NioEventLoopGroup(1);
    12. workerGroup = new NioEventLoopGroup();
    13. //2.设置启动助手
    14. ServerBootstrap bootstrap = new ServerBootstrap();
    15. //3.设置启动参数
    16. bootstrap.group(bossGroup, workerGroup)
    17. .channel(NioServerSocketChannel.class)
    18. .childHandler(new ChannelInitializer<SocketChannel>() {
    19. @Override
    20. protected void initChannel(SocketChannel ch) throws Exception {
    21. //添加String的编解码器,使得客户端和服务器可以直接使用String类型进行交流
    22. ch.pipeline().addLast(new StringDecoder());
    23. ch.pipeline().addLast(new StringEncoder());
    24. //添加自定义处理器
    25. ch.pipeline().addLast(nettyServerHandler);
    26. }
    27. });
    28. //绑定ip和端口号
    29. ChannelFuture channelFuture = bootstrap.bind(host, port).sync();
    30. System.out.println("======Netty服务端启动成功======");
    31. //监听通道的关闭状态(此处并没有真正的关闭),阻塞直到close事件发生
    32. channelFuture.channel().closeFuture().sync();
    33. } catch (InterruptedException e) {
    34. e.printStackTrace();
    35. //关闭资源
    36. if (bossGroup != null) {
    37. bossGroup.shutdownGracefully();
    38. }
    39. if (workerGroup != null) {
    40. workerGroup.shutdownGracefully();
    41. }
    42. }
    43. }
    44. /** * 实现DisposableBean接口要重写的方法,Spring关闭的时候执行 */
    45. @Override
    46. public void destroy() throws Exception {
    47. //关闭资源
    48. if (bossGroup != null) {
    49. bossGroup.shutdownGracefully();
    50. }
    51. if (workerGroup != null) {
    52. workerGroup.shutdownGracefully();
    53. }
    54. }

    }

  4. 服务端自定义处理器

    /* 服务端自定义处理器 1.将标有@RpcService注解的bean进行缓存,方便后续查找调用 2.接收客户端的请求 3.根据传递过来的beanName从缓存中查找bean 4.通过反射调用bean的方法 5.给客户端响应 /
    @Component
    @ChannelHandler.Sharable //设置通道共享,NettyServerHandler默认是单例的,不能由多个客户端所共享
    public class NettyServerHandler extends SimpleChannelInboundHandler implements ApplicationContextAware {

    1. /** * 定义缓存,将将标有@RpcService注解的bean缓存至此map */
    2. static Map<String, Object> SERVICE_INSTANCE_MAP = new HashMap<>();
    3. /** * 1.将标有@RpcService的注解的bean进行缓存,实现ApplicationContextAware接口后重写的方法 * @param applicationContext ioc容器 */
    4. @Override
    5. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    6. //1.1 通过注解获取bean的集合(可能有多个类都使用了这个注解)
    7. Map<String, Object> serviceMap = applicationContext.getBeansWithAnnotation(RpcService.class);
    8. //1.2 循环遍历
    9. Set<Map.Entry<String, Object>> entries = serviceMap.entrySet();
    10. for (Map.Entry<String, Object> entry : entries) {
    11. //serviceBean就是自定义的UserServiceImpl的对象
    12. Object serviceBean = entry.getValue();
    13. if (serviceBean.getClass().getInterfaces().length == 0) {
    14. throw new RuntimeException("对外暴露的服务必须实现接口");
    15. }
    16. //默认处理第一个接口作为缓存bean的名字(一个类可以实现多个接口)
    17. String serviceName = serviceBean.getClass().getInterfaces()[0].getName();
    18. SERVICE_INSTANCE_MAP.put(serviceName, serviceBean);
    19. System.out.println(SERVICE_INSTANCE_MAP);
    20. //{包名.IUserService=包名.UserServiceImpl@44c73c26}
    21. }
    22. }
    23. /** * 2.接收客户端的请求,通道读取就绪事件,读取客户端的消息 * @param ctx 每一个Handler实例与Pipeline之间的桥梁就是ChannelHandlerContext实例 * @param msg 客户端发送过来的消息,是一个JSON格式的RpcRequest的对象 */
    24. @Override
    25. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    26. // 接收客户端的请求
    27. RpcRequest rpcRequest = JSON.parseObject(msg, RpcRequest.class);
    28. //创建响应对象,请求id是请求中传递过来的
    29. RpcResponse rpcResponse = new RpcResponse();
    30. rpcResponse.setRequestId(rpcRequest.getRequestId());
    31. //调用自定义的业务处理方法(上述的3、4步),并将处理结果保存到响应中
    32. try {
    33. rpcResponse.setResult(handler(rpcRequest));
    34. } catch (Exception e) {
    35. e.printStackTrace();
    36. //业务处理过程中出现的异常也要返回给客户端
    37. rpcResponse.setError(e.getMessage());
    38. }
    39. //5. 将响应对象给客户端响应
    40. ctx.writeAndFlush(JSON.toJSONString(rpcResponse));
    41. }
    42. /** * 定义业务处理方法(上述的3、4步) */
    43. private Object handler(RpcRequest rpcRequest) throws InvocationTargetException {
    44. //3.根据传递过来的beanName从缓存中查找,className是请求对象中的属性
    45. Object serviceBean = SERVICE_INSTANCE_MAP.get(rpcRequest.getClassName());
    46. if (serviceBean == null) {
    47. throw new RuntimeException("服务端没有找到服务");
    48. }
    49. // 4.通过反射调用bean的方法
    50. // 调用cglid提供的FastClass中的方法,创建指定类的代理对象
    51. FastClass proxyClass = FastClass.create(serviceBean.getClass());
    52. //参数分别表示客户端调用的服务端的方法和参数类型,通过返回的method完成最终的方法调用
    53. FastMethod method = proxyClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
    54. return method.invoke(serviceBean, rpcRequest.getParameters()); //反射
    55. }

    }

通过上述代码可知,必须加了自定义注解 @RpcService 的 bean 才会被缓存(服务才可以被调用),即自定义注解用来暴露服务,且服务端必须实现接口,约定协议,否则抛出异常。

  1. 服务端启动类

    /* 项目启动的时候需要将服务端启动并监听端口 启动类实现CommandLineRunner接口 CommandLineRunner接口是在容器启动成功后的最后一步回调(类似开机自启动) */
    @SpringBootApplication
    public class ServerBootstrapApplication implements CommandLineRunner {

    1. /** * 注入Netty服务端 */
    2. @Autowired
    3. NettyRpcServer rpcServer;
    4. public static void main(String[] args) {
    5. SpringApplication.run(ServerBootstrapApplication.class, args);
    6. }
    7. /** * 实现CommandLineRunner接口后要重写的方法,创建线程调用服务端的方法启动服务端 */
    8. @Override
    9. public void run(String... args) throws Exception {
    10. new Thread(new Runnable() {
    11. @Override
    12. public void run() {
    13. rpcServer.start("127.0.0.1", 8899);
    14. }
    15. }).start();
    16. }

    }

流程总结:

image-20210713183843918

5. 客户端代码

  1. 客户端业务处理类,向服务端发送数据并接收返回结果

    @Component
    public class NettyRpcClientHandler extends SimpleChannelInboundHandler implements Callable {

    1. ChannelHandlerContext context; //全局的ChannelHandlerContext对象
    2. private String reqMsg; //发送的消息
    3. private String respMsg; //接收的消息
    4. //给发送的消息赋值
    5. public void setReqMsg(String reqMsg) {
    6. this.reqMsg = reqMsg;
    7. }
    8. /** * 通道读取就绪事件,读取服务端消息 * @param msg 接收到的服务端数据 */
    9. @Override
    10. protected synchronized void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    11. respMsg = msg;
    12. //唤醒call方法的等待线程
    13. notify();
    14. }
    15. /*** * 通道连接就绪事件,给全局的ChannelHandlerContext对象赋值 * 不能连接建立之后就给服务端发送消息,因为不知道要发送什么消息,客户端调用controller方法才可发送消息,所以实现Callable接口,调用call方法时才发送消息 */
    16. @Override
    17. public void channelActive(ChannelHandlerContext ctx) throws Exception {
    18. context = ctx;
    19. }
    20. /*** * 使用线程的等待与唤醒完成消息的同步处理,因为netty仅支持异步,不能发送完消息然后等待服务端的消息返回。 */
    21. /** * 实现Callable接口重写的方法 * 通过全局的ChannelHandlerContext对象给服务端发送消息,调用call方法才会发送 */
    22. @Override
    23. public synchronized Object call() throws Exception {
    24. context.writeAndFlush(reqMsg);
    25. //客户端向服务端发送消息之后,要等待服务端的响应,调用本类的channelRead0方法读取服务端的数据
    26. //将线程处于等待状态
    27. wait();
    28. return respMsg; //被唤醒之后,返回从服务端接收到的消息
    29. }

    }

  2. 客户端代码

    /* Netty客户端 1.连接服务端,工程一启动就去连接服务端,实现InitializingBean接口 2.提供发送消息的方法 3.工程停止后关闭资源,实现DisposableBean接口 /
    @Component
    public class NettyRpcClient implements InitializingBean, DisposableBean {

    1. EventLoopGroup group = null;
    2. Channel channel = null;
    3. //注入自定义的客户端处理器
    4. @Autowired
    5. NettyRpcClientHandler nettyRpcClientHandler;
    6. //创建线程池,用于执行自定义处理器中重写的call方法,向服务端发送消息
    7. ExecutorService service = Executors.newCachedThreadPool();
    8. /** * 实现InitializingBean接口要重写的方法 * 1.连接服务端 */
    9. @Override
    10. public void afterPropertiesSet() throws Exception {
    11. try {
    12. //1.1 创建线程组
    13. group = new NioEventLoopGroup();
    14. //1.2 创建客户端启动助手
    15. Bootstrap bootstrap = new Bootstrap();
    16. //1.3 设置参数
    17. bootstrap.group(group)
    18. .channel(NioSocketChannel.class)
    19. .handler(new ChannelInitializer<SocketChannel>() {
    20. @Override
    21. protected void initChannel(SocketChannel ch) throws Exception {
    22. //添加编解码器
    23. ch.pipeline().addLast(new StringDecoder());
    24. ch.pipeline().addLast(new StringEncoder());
    25. //添加自定处理类
    26. ch.pipeline().addLast(nettyRpcClientHandler);
    27. }
    28. });
    29. //1.4 连接服务端
    30. channel = bootstrap.connect("127.0.0.1", 8899).sync().channel();
    31. } catch (Exception e) {
    32. e.printStackTrace();
    33. if (channel != null) {
    34. channel.close();
    35. }
    36. if (group != null) {
    37. group.shutdownGracefully();
    38. }
    39. }
    40. }
    41. /** * 3.工程停止后关闭资源 * 实现DisposableBean接口要重写的方法 */
    42. @Override
    43. public void destroy() throws Exception {
    44. if (channel != null) {
    45. channel.close();
    46. }
    47. if (group != null) {
    48. group.shutdownGracefully();
    49. }
    50. }
    51. /** * 2. 消息发送的方法,调用自定义处理器中的方法定义发送的消息 * @param msg 要发送的消息 * @return 服务端返回的消息 */
    52. public Object send(String msg) throws ExecutionException, InterruptedException {
    53. //调用自定义处理器的方法给要发送的消息赋值
    54. nettyRpcClientHandler.setReqMsg(msg);
    55. //执行自定义处理器中的call方法,向服务端发送消息
    56. Future submit = service.submit(nettyRpcClientHandler);
    57. //得到call方法的返回值,也就是服务端返回的消息
    58. return submit.get();
    59. }

    }

  3. 公共接口IUserService的代理

    调用 IUserService 接口的 getById() 方法时,自动的封装RpcRequest,不然每调用一次方法就手动封装一个RpcRequest对象,不合理。

    而且不论其他的controller访问公共接口时,不用再去创建公共接口,直接访问缓存中的代理对象即可。

    /* IUserService的代理 1. 发送消息之前将消息封装成RpcRequest对象,并发送消息 2. 解析服务器返回的数据 */
    @Component
    public class RpcClientProxy {

    1. @Autowired
    2. NettyRpcClient rpcClient;
    3. //缓存创建好的代理对象,方便复用
    4. Map<Class, Object> SERVICE_PROXY = new HashMap<>();
    5. /** * 获取代理对象,参数表示获取谁的代理对象 */
    6. public Object getProxy(Class serviceClass) {
    7. //从缓存中查找
    8. Object proxy = SERVICE_PROXY.get(serviceClass);
    9. if (proxy == null) {
    10. //创建代理对象
    11. //第二参数表示IUserService
    12. proxy = Proxy.newProxyInstance(this.getClass().getClassLoader(),
    13. new Class[]{ serviceClass}, new InvocationHandler() {
    14. //invoke方法不会被立即调用,只有访问接口的方法时才会调用
    15. @Override
    16. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    17. //1.封装请求对象
    18. RpcRequest rpcRequest = new RpcRequest();
    19. rpcRequest.setRequestId(UUID.randomUUID().toString());
    20. //method是接口的方法封装后的对象,getclass得到的就是接口,而服务端的缓存bean的名字就是接口的名字
    21. rpcRequest.setClassName(method.getDeclaringClass().getName());
    22. rpcRequest.setMethodName(method.getName());
    23. rpcRequest.setParameterTypes(method.getParameterTypes());
    24. rpcRequest.setParameters(args);
    25. try {
    26. //2.发送消息,返回值表示服务端返回的数据
    27. Object msg = rpcClient.send(JSON.toJSONString(rpcRequest));
    28. //3.将服务端的消息转化
    29. RpcResponse rpcResponse = JSON.parseObject(msg.toString(), RpcResponse.class);
    30. if (rpcResponse.getError() != null) {
    31. throw new RuntimeException(rpcResponse.getError());
    32. }
    33. if (rpcResponse.getResult() != null) {
    34. return JSON.parseObject(rpcResponse.getResult().toString(),
    35. method.getReturnType());
    36. }
    37. return null;
    38. } catch (Exception e) {
    39. e.printStackTrace();
    40. throw e;
    41. }
    42. }
    43. });
    44. //将代理对象放入缓存
    45. SERVICE_PROXY.put(serviceClass, proxy);
    46. return proxy;
    47. } else {
    48. return proxy;
    49. }
    50. }

    }

  4. 引用代理类

    /* 引用代理类,使用在controller的IUserService上 */
    @Target(ElementType.FIELD) //作用于字段
    @Retention(RetentionPolicy.RUNTIME) //在运行时可以获取得到
    public @interface RpcReference {
    }

  5. 用户控制类

    /* 用户控制类,远程调用服务端的getUserById方法 */
    @RestController
    @RequestMapping(“/user”)
    public class UserController {

    1. //使用了注解之后,目前还不能使用代理对象,必须经过后续的处理才能将代理对象注入进来
    2. @RpcReference
    3. IUserService userService;
    4. @RequestMapping("/getUserById")
    5. public User getUserById(int id) {
    6. //调用服务端方法
    7. return userService.getById(id);
    8. }

    }

  6. bean的后置增强,将代理对象注入到使用了自定义的 @RpcReference 注解的字段上

    类似于 @Autowired 注解,使用这个注解之后将某个对象注入到这个字段上

    /* bean的后置增强 */
    @Component
    public class MyBeanPostProcessor implements BeanPostProcessor {

    1. @Autowired
    2. RpcClientProxy rpcClientProxy;
    3. /** * 自定义注解的注入 */
    4. @Override
    5. public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    6. //1.得到bean(u)中的所有字段
    7. Field[] declaredFields = bean.getClass().getDeclaredFields();
    8. //遍历字段
    9. for (Field field : declaredFields) {
    10. //2.查找字段中是否包含RpcReference这个注解
    11. RpcReference annotation = field.getAnnotation(RpcReference.class);
    12. if (annotation != null) {
    13. //3.获取代理对象
    14. Object proxy = rpcClientProxy.getProxy(field.getType());
    15. try {
    16. //4.属性注入,field表示加了RpcReference注解的IUserService对象
    17. field.setAccessible(true);
    18. field.set(bean, proxy); //userService由代理对象创建
    19. } catch (IllegalAccessException e) {
    20. e.printStackTrace();
    21. }
    22. }
    23. }
    24. return bean;
    25. }

    }

流程总结:

image-20210713211703690

运行结果:

将服务端先启动,然后启动客户端,进行访问:

image-20210713212210086

发表评论

表情:
评论列表 (有 0 条评论,379人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Netty高性能RPC框架

    Netty是一个开源的、高性能的、异步事件驱动的网络通信框架,支持多种协议和编码解码器,能够帮助开发人员快速构建高性能、可扩展的网络应用程序。它的主要优势包括: 1. 异步

    相关 基于Netty实现简单RPC框架

            Dubbo采用Netty作为基础通信组件,模仿Dubbo实现简单版的RPC框架。服务消费者和服务提供者约定接口和协议,用于远程调用和TCP请求验证。服务提供者作