Dubbo之手写RPC框架

Love The Way You Lie 2022-12-10 03:53 404阅读 0赞

建议从Dubbo之@SPI开始看。

关键词:Dubbo RPC


纯手写实现一个简单的RPC调用,帮助更好地学习和理解Dubbo

RPC-远程过程调用,我感觉可以理解成客户端(即消费者)通过TCP加上特定的消息协议访问服务端(即提供者),服务端根据消息协议内容调用本地方法并响应给客户端。就好像浏览器采用http协议,通过TCP传输去调用服务端接口一样,只不过http调用的是服务端的接口,接口其实对应着某个特定的方法。而RPC则直接调用服务端的方法。

关于协议:协议就是双方约定好的一种消息格式,这样发送消息的人发送出去的消息,接收消息的人才能够认识。就好比,两个人通信,发信人用的是中文,收信人呢就去查看新华字典来一个个读取信件中的内容,然后便知道和理解发信人的目的和行为了。你想想,如果此时的收信人拿出一本牛津词典查来查去,他是如论如何都不会理解中文的信件的。这里的新华字典就好比协议。

通过简单的代码来实现简单的RPC调用,这样更有助于理解和使用Dubbo,Dubbo包装了很多功能,理解起来还是蛮困难的。我感觉就像上面说的那样,本质还是TCP调用。TCP是传输层的协议了,比较底层,在JAVA中对应着就是Socket和ServerSocket。


█ 总体认识

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl81MDUxODI3MQ_size_16_color_FFFFFF_t_70

  • 我代码都放在一个模块里面了,实际的项目中,客户端代码和服务端代码是分开的,有可能就是放在不同的服务器上。公共代码是客户端和服务端都需要的。
  • 客户端代码就是消费者,也就是服务的调用方;服务端代码就是提供者,也就是服务的提供者

█ 公共代码

  1. package common;
  2. /**
  3. *
  4. * 接口,面向接口调用。客户端调用的是接口的代理,
  5. * 服务端真正调用接口的实现类(即CatService或DogService)
  6. *
  7. */
  8. public interface AnimalService {
  9. String say();
  10. int age(int age);
  11. }

协议类,客户端和服务端都需要使用。客户端根据协议的规定格式创建请求信息,服务端根据协议的规定格式解析请求内容。我这里的协议比较简单,就是字符串拼接,包含了请求的接口,方法和参数等信息(这样都是定位到调用的具体方法必不可少的条件)。dubbo-start的作用是用来标识一段请求消息的内容的。当多个客户端同时请求了服务端,服务端获取到的请求内容可能会被放到同一个字节数组里面,这样dubbo-start方面拆分每一条请求。请求的唯一标识的作用差不多,能够方便客户端解析响应消息的时候,对应上自己的哪一次请求。

  1. package common;
  2. import java.util.ArrayList;
  3. import java.util.HashMap;
  4. import java.util.List;
  5. import java.util.Map;
  6. /**
  7. * 协议类,用于组装、拆解交流的信息
  8. *
  9. * 协议规定:一个请求必须以dubbo-start开头,第二个是请求的唯一标识 第三个是类的全限定名,第四个是方法名 第五个是参数类型列表
  10. * 第六个是参数值,(参数类型和参数值,多个用逗号分割)
  11. * 值与值中间用一个空行分割
  12. *
  13. */
  14. public class Protocol {
  15. /**
  16. * 构建请求信息,即客户端按照协议规定的格式构建请求信息
  17. * @param requestId 请求唯一标识
  18. * @param targetClass 请求的目标类
  19. * @param methodName 请求的目标方法
  20. * @param paramTypes 目标方法的参数类型
  21. * @param paramValues 传给目标方法的参数
  22. * @return
  23. */
  24. public byte[] packRequest(String requestId, Class targetClass, String methodName, Class[] paramTypes, Object[] paramValues) {
  25. StringBuilder sb = new StringBuilder();
  26. // 拼接协议信息
  27. sb.append("dubbo-start").append(" ").append(requestId).append(" ").append(targetClass.getName()).append(" ").
  28. append(methodName).append(" ");
  29. // 处理参数类型
  30. if (paramTypes!=null && paramTypes.length>0) {
  31. StringBuilder paramType = new StringBuilder();
  32. for (Class type : paramTypes) {
  33. paramType.append(type.getName()).append(",");
  34. }
  35. // 去掉结尾的逗号
  36. String substring = paramType.toString().substring(0, paramType.toString().length() - 1);
  37. sb.append(substring).append(" ");
  38. }
  39. // 处理参数值
  40. if (paramValues!=null && paramValues.length>0) {
  41. StringBuilder paramValue = new StringBuilder();
  42. for (Object value : paramValues) {
  43. paramValue.append(value).append(",");
  44. }
  45. // 去掉结尾的逗号
  46. String substring = paramValue.toString().substring(0, paramValue.toString().length() - 1);
  47. sb.append(substring);
  48. }
  49. return sb.toString().getBytes();
  50. }
  51. /**
  52. * 拆解请求信息,即服务端解析客户端的请求信息
  53. * @param bytes
  54. * @param len
  55. * @return
  56. */
  57. public Map<String, Object> unpackRequest(byte[] bytes, int len) {
  58. Map<String, Object> map = new HashMap<>();
  59. String recv = new String(bytes, 0, len);
  60. // 根据协议约定,按照空格分隔各个请求信息
  61. String[] split = recv.split(" ");
  62. System.out.println("请求信息:"+split);
  63. // 请求ID
  64. map.put(Const.REQUEST_ID, split[1]);
  65. // 接口名
  66. map.put(Const.INTERFACE_NAME, split[2]);
  67. // 方法名
  68. map.put(Const.METHOD_NAME, split[3]);
  69. // 参数类型
  70. List<Class> paramTypelist = new ArrayList<>();
  71. if (split.length>4 && split[4]!=null) {
  72. String[] types = split[4].split(",");
  73. for (String type : types) {
  74. paramTypelist.add(convertParamType(type));
  75. }
  76. map.put(Const.PARAM_TYPES, paramTypelist);
  77. }
  78. // 参数值
  79. if (split.length>5 && split[5]!=null) {
  80. String[] values = split[5].split(",");
  81. List<Object> valueList = new ArrayList<>(values.length);
  82. for (int i=0; i<values.length; i++) {
  83. valueList.add(convertParamValue(paramTypelist.get(i), values[i]));
  84. }
  85. map.put(Const.PARAM_VALUES, valueList);
  86. }
  87. return map;
  88. }
  89. /**
  90. * 参数类型转换
  91. * @param type
  92. * @return
  93. */
  94. private Class convertParamType(String type) {
  95. // 简单地举了几个例子
  96. if ("int".equals(type)) {
  97. return int.class;
  98. } else if ("java.util.ArrayList".equals(type) || "ArrayList".equals(type)) {
  99. return ArrayList.class;
  100. }
  101. return String.class;
  102. }
  103. /**
  104. * 参数值转换。解析的时候都转成了字符串,
  105. * 这里需要根据参数具体的类型转换
  106. * @param type
  107. * @param value
  108. * @return
  109. */
  110. private Object convertParamValue(Class type, String value) {
  111. // 简单的举了几个例子
  112. if (type==int.class) {
  113. return Integer.parseInt(value);
  114. } else if (type==double.class) {
  115. return Double.parseDouble(value);
  116. }
  117. return value;
  118. }
  119. }
  120. package common;
  121. /**
  122. *
  123. * 常量类,抽出字符串
  124. *
  125. */
  126. public class Const {
  127. // 请求ID
  128. public static final String REQUEST_ID = "requestId";
  129. // 请求目标接口类型
  130. public static final String INTERFACE_NAME = "interfaceName";
  131. // 请求目标方法
  132. public static final String METHOD_NAME = "methodName";
  133. // 请求方法参数类型
  134. public static final String PARAM_TYPES = "paramTypes";
  135. // 请求方法参数值
  136. public static final String PARAM_VALUES = "paramValues";
  137. }

█ 服务端

Service-包装了服务发布的功能,即通过创建的ServerSocket来接收和响应客户端的请求。

  1. package server;
  2. import common.Const;
  3. import common.Protocol;
  4. import java.io.InputStream;
  5. import java.io.OutputStream;
  6. import java.lang.reflect.Method;
  7. import java.net.ServerSocket;
  8. import java.net.Socket;
  9. import java.util.List;
  10. import java.util.Map;
  11. import java.util.concurrent.ConcurrentHashMap;
  12. /**
  13. *
  14. * 包装服务端服务发布的功能
  15. *
  16. */
  17. public class Service<T> {
  18. // 用于存放接口和实现类对象的关系,作为服务端服务字典,方便查找
  19. private static ConcurrentHashMap<String, Object> beanMap = new ConcurrentHashMap<>();
  20. // 协议,在服务端,协议的作用就是解析消费端发送过来的消息
  21. private Protocol protocol = new Protocol();
  22. // 接口的实现类
  23. private T obejct;
  24. // 接口类型
  25. private Class interfaceClass;
  26. public Service(Class interfaceClass, T object) {
  27. // 前置校验,借实现类是否实现了接口
  28. boolean assignableFrom = interfaceClass.isAssignableFrom(object.getClass());
  29. if (!assignableFrom) {
  30. throw new IllegalArgumentException("object 必须实现interfaceClass接口");
  31. }
  32. this.interfaceClass = interfaceClass;
  33. this.obejct = object;
  34. // 缓存起来,方便客户端调用的时候,查找目标类
  35. beanMap.put(interfaceClass.getName(), object);
  36. }
  37. public void export() {
  38. try {
  39. ServerSocket serverSocket = ServiceServer.getServerSocket();
  40. while (true) {
  41. // 这里会阻塞,等待客户端的连接
  42. Socket socket = serverSocket.accept();
  43. // 响应结果
  44. byte[] result = new byte[1];
  45. // 接收消费者的请求
  46. InputStream inputStream = socket.getInputStream();
  47. byte[] recv = new byte[1024];
  48. // 这里会阻塞,等待客户端的请求信息
  49. int len = inputStream.read(recv);
  50. // 根据协议,转换请求信息
  51. Map<String, Object> requestMap = protocol.unpackRequest(recv, len);
  52. // 获取调用的接口
  53. Object interfaceName = requestMap.get(Const.INTERFACE_NAME);
  54. // 获取接口实现类对象
  55. Object object = beanMap.get(interfaceName);
  56. if (object==null) {
  57. result = "请求的接口不存在".getBytes();
  58. } else {
  59. // 方法名
  60. Object methodName = requestMap.get(Const.METHOD_NAME);
  61. // 参数类型
  62. Class[] paramTypes = null;
  63. List<Class> paramTypeList = (List<Class>)requestMap.get(Const.PARAM_TYPES);
  64. if (paramTypeList!=null && paramTypeList.size()>0) {
  65. paramTypes = paramTypeList.toArray(new Class[paramTypeList.size()]);
  66. }
  67. // 参数值
  68. Object[] paramValues = null;
  69. List<Object> paramValueList = (List<Object>)requestMap.get(Const.PARAM_VALUES);
  70. if (paramValueList!=null && paramValueList.size()>0) {
  71. paramValues = paramValueList.toArray();
  72. }
  73. // 获取到调用的方法
  74. Method method = object.getClass().getMethod(methodName.toString(), paramTypes);
  75. // 方法调用结果
  76. Object invoke = method.invoke(object, paramValues);
  77. if (invoke!=null) {
  78. result = invoke.toString().getBytes();
  79. }
  80. }
  81. // 将结果返回给客户端
  82. OutputStream os = socket.getOutputStream();
  83. os.write(result);
  84. os.flush();
  85. os.close();
  86. inputStream.close();
  87. }
  88. } catch (Exception e) {
  89. e.printStackTrace();
  90. }
  91. }
  92. }
  93. package server;
  94. import java.net.ServerSocket;
  95. /**
  96. * 创建ServerSocket,暴露服务
  97. * 多次服务暴露,都使用一个ServerSocket
  98. * 这里的端口号就固定写了8899
  99. *
  100. */
  101. public class ServiceServer {
  102. private static volatile ServerSocket serverSocket;
  103. private ServiceServer() {
  104. throw new IllegalStateException();
  105. }
  106. /**
  107. * 创建一个ServerSocket,用于暴露服务。
  108. * @return
  109. */
  110. public static ServerSocket getServerSocket() {
  111. if (serverSocket==null) {
  112. synchronized (ServiceServer.class) {
  113. if (serverSocket==null) {
  114. try {
  115. // 因为没有注册中心的功能,没法让消费者去感知服务的端口,这里端口就写死了
  116. serverSocket = new ServerSocket(8899);
  117. } catch (Exception e) {
  118. e.printStackTrace();
  119. }
  120. }
  121. }
  122. }
  123. return serverSocket;
  124. }
  125. }
  126. package server;
  127. import common.AnimalService;
  128. public class DogService implements AnimalService {
  129. @Override
  130. public String say() {
  131. return "this is a dog";
  132. }
  133. @Override
  134. public int age(int age) {
  135. return age;
  136. }
  137. }
  138. package server;
  139. import common.AnimalService;
  140. public class CatService implements AnimalService {
  141. @Override
  142. public String say() {
  143. return "this is a cat";
  144. }
  145. @Override
  146. public int age(int age) {
  147. return age + 10;
  148. }
  149. }

█ 客户端

  1. package client;
  2. import common.Protocol;
  3. import java.io.InputStream;
  4. import java.io.OutputStream;
  5. import java.lang.reflect.InvocationHandler;
  6. import java.lang.reflect.Method;
  7. import java.lang.reflect.Proxy;
  8. import java.net.Socket;
  9. import java.util.UUID;
  10. /**
  11. * 服务端调用代理类
  12. * 客户端并不知道在服务端上接口具体的实现类是哪个,只能通过调用接口来获取。
  13. * 创建接口的代理,主要工作就是创建Socket去连接服务端,并按照协议格式发送请求
  14. *
  15. */
  16. public class Reference implements InvocationHandler {
  17. // 协议
  18. private Protocol protocol = new Protocol();
  19. private Class interfaceClass;
  20. public Reference(Class interfaceClass) {
  21. this.interfaceClass = interfaceClass;
  22. }
  23. public Object getReference() {
  24. return Proxy.newProxyInstance(Reference.class.getClassLoader(), new Class[]{interfaceClass}, this);
  25. }
  26. @Override
  27. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  28. Object result = null;
  29. String name = method.getName();
  30. // 如果是父类Object中的方法就直接返回方法名,这里举几个例子,不全
  31. if ("equals".equals(name) || "toString".equals(name)) {
  32. return name;
  33. }
  34. Socket socket = new Socket("localhost", 8899);
  35. // 客户端发送请求
  36. OutputStream os = socket.getOutputStream();
  37. // 通过约定好的协议格式构建请求信息
  38. byte[] requestByte = protocol.packRequest(UUID.randomUUID().toString(), interfaceClass, name, method.getParameterTypes(), args);
  39. os.write(requestByte);
  40. os.flush();
  41. os.close();
  42. // 客户端接收响应结果
  43. InputStream is = socket.getInputStream();
  44. byte[] recv = new byte[1024];
  45. int read = is.read(recv);
  46. is.close();
  47. // 这里应该还有解码器的,我就简单地写了一下
  48. Class<?> returnType = method.getReturnType();
  49. if (String.class==returnType) {
  50. result = new String(recv, 0, read);
  51. } else if (int.class==returnType || Integer.class==returnType) {
  52. return Integer.parseInt(new String(recv, 0, read));
  53. }
  54. return result;
  55. }
  56. }

█ 运行服务端

  1. package server;
  2. import common.AnimalService;
  3. public class Main {
  4. public static void main(String[] args) {
  5. // 暴露DogService
  6. Service service = new Service(AnimalService.class, new DogService());
  7. // 启动一个ServerSocket,等待客户端的连接和接收消息
  8. service.export();
  9. }
  10. }

█ 运行客户端

  1. package client;
  2. import common.AnimalService;
  3. public class Main {
  4. public static void main(String[] args) {
  5. // 客户端创建代理对象,代理去实现请求服务端的逻辑
  6. Reference reference = new Reference(AnimalService.class);
  7. AnimalService animalService = (AnimalService)reference.getReference();
  8. // 当调用say方法,实际会调用Reference的invoke方法
  9. String say = animalService.say();
  10. System.out.println(say);
  11. System.out.println("获取到的age="+animalService.age(10));
  12. // 输出结果:
  13. // this is a dog
  14. //获取到的age=10
  15. }
  16. }

█ 总结

上面代码写的粗糙,功能也粗糙,只是简单地模拟了一下RPC调用,实现了服务端和客户端,根据协议构建和解析协议的功能。希望看完能够帮助更好的去理解Dubbo。在Dubbo中还实现了注册中心,路由选择,负载均衡,容错等等强大的功能,这些功能有机会的话会在后面介绍(希望我能坚持写下去)。我想,只有理解了RPC的工作原理,才能更好的去学习和理解Dubbo的功能。

关于Dubbo的使用、服务暴露、服务注册、负载均衡等,官网已经有很详细的介绍了。我就不重复写了,写了也就是分析源码。具体请移步Dubbo官网慢慢品味:我简述一下自己关于Dubbo的理解:

服务端:

  • 服务暴露:将ServiceConfig对应的接口实现类对外提供调用服务,让客户端(消费者)能够请求调用。

  • 服务注册:将ServiceConfig对应的接口实现类注册到注册中心上,这样是对客户端透明,即提供了一个公示栏,让客户端调用者查看其可以调用哪些服务。

  • 一个或@Service,就对应一个ServiceConfig实例。ServiceConfig是Dubbo服务暴露和注册的起点。开始于export方法

  • ServiceConfig在暴露和注册服务时,使用了Invoker这样的代理类,能够灵活地调用具体的接口实现类,而不用写死代码。

  • 服务暴露时,一个Invoker又会被封装成一个Exporter,由这个Exporter负责具体的服务暴露和引用工作。
  • 所有被暴露的服务都会缓存在Map> exporterMap = new ConcurrentHashMap(),exporterMap之中,key是服务的标识信息(端口,版本号,分组名)。这样当消费者待着请求信息过来请求时,Dubbo提取一些参数值,组装成key,去本地缓存中查找对应的Exporter,而Exporter中又包含了Invoker,这样自然就调用了实际的接口实现类方法了。
  • 真正做服务暴露和注册的是协议Protocol,不如DubboProtocol,RegistryProtocol。

Service ——-> ServiceConfig ———-> Invoker ——-> Exporter

消费端:

  • 一个或@Reference对应一个ReferenceConfig。起点是get方法。
  • 一个ReferenceConfig也是对应一个Invoker,通过Invoker代理服务端的调用。
  • Invoker外包装了一层Directory。这个可以理解成消费者端的本地缓存,缓存了服务端的服务调用。当发起远程调用时,要去Directory这个缓存中去查找对应的Invoker,由Invoker完成具体的调用过程。
  • Directory又被Cluster所有。Cluster根据Directory中缓存的Invoker列表完成路由、负载均衡和容错功能

Reference ——-> ReferenceConfig ———> Cluster ———-> Directory ——-> Invoker———->(Exporter ———>Invoker——->Service)

发表评论

表情:
评论列表 (有 0 条评论,404人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RPC框架泛化调用

    一、背景 前段时间了解了泛化调用这个玩意儿,又想到自己之前写过一个RPC框架(参考《[手写一个RPC框架][RPC]》),于是便想小试牛刀。 二、泛化调用简介 什

    相关 Javadubbo框架

    前言 在对dubbo有了较为深入的使用和理解后,来尝试从dubbo框架的角度重新认识下它,对照着dubbo官方的这张图进行反复的理解后,我们可以从已有掌握的技术出发,来尝

    相关 RPC框架(netty+zookeeper)

      RPC是什么?远程过程调用,过程就是业务处理、计算任务,像调用本地方法一样调用远程的过程。   RMI和RPC的区别是什么?RMI是远程方法调用,是oop领域中RPC的一

    相关 RPC框架 Dubbo

    一、分布式基础理论 -------------------- 1、什么是分布式系统 《分布式系统原理与范型》定义: 分布式系统是若干独立计算机的集合,这些计算机