手写一个简化版的Dubbo框架

忘是亡心i 2023-05-28 14:27 95阅读 0赞

在学习了Dubbo之后, 我发现自己好像了解了Dubbo的实现原理, 又好像不是很了解, 毕竟我只是背诵了下概念, 没有深入的去看源码. 这里我就来手写一个简化版的Dubbo框架, 通过动手实践来深入理解Dubbo的实现原理.

Dubbo的实现原理

RPC调用的过程

我们先来看下RPC调用的过程.

在这里插入图片描述

  • 服务容器负责启动,加载,运行服务提供者。
  • 服务提供者在启动时,向注册中心注册自己提供的服务。
  • 服务消费者在启动时,向注册中心订阅自己所需的服务。
  • 注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。
  • 服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行RPC调用,如果调用失败,再选另一台调用。
  • 服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心。

RPC调用的原理

RPC调用的原理是: 动态代理, 反射, 网络传输.

  • 消费者从注册中心获取到服务提供者的地址后, 与服务提供者建立TCP连接.
  • 消费者将服务的全限定类名(String), 方法名(String), 方法参数类型(Class[]), 方法参数(Object[]), 通过TCP传输给服务提供者.
  • 服务提供者获取到这些数据后, 通过反射调用对应服务的方法, 然后将执行结果通过TCP返回给服务消费者.
  • 整个RPC调用过程被封装到动态代理中, 对用户来说是透明的.

Dubbo架构

Dubbo框架设计分为十层:

在这里插入图片描述

  • service 服务层, 为服务提供者和服务消费者提供接口.
  • config 配置层, 提供dubbo的各种配置.
  • proxy 服务接口透明代理, 生成动态代理.
  • registry 注册中心层, 负责服务的注册与发现.
  • cluster 路由层, 封装多个提供者的路由及负载均衡.
  • monitor 监控层, RPC调用次数和调用时间监控.
  • protocol 远程调用层, 封装 RPC 调用.
  • exchange 信息交换层, 封装请求响应模式, 同步转异步.
  • transport 网络传输层, 抽象 mina 和 netty 为统一接口.
  • serialize 数据序列化层, 提供数据序列化的接口.

手写简化版的Bubbo框架

我们根据Dubbo的框架设计来手写一个简化版的Dubbo, 其中序列化协议使用Java原生的Serializable, 网络传输协议使用原生的TCP, 负载均衡使用随机算法, 注册中心使用ZooKeeper, 动态代理使用JDK Proxy.

github地址: 手写一个简化版的Dubbo框架

服务提供者

(1) ZooKeeper常量

定义了ZooKeeper的地址和Dubbo注册中心的根节点路径.

  1. /**
  2. * @author litianxiang
  3. * @date 2020/3/17 11:45
  4. */
  5. public class ZooKeeperConst {
  6. /**
  7. * ZooKeeper的地址
  8. */
  9. public static String host = "xxx.xx.xx.xxx:2181";
  10. /**
  11. * Dubbo在ZooKeeper上的根节点
  12. */
  13. public static String rootNode = "/dubbo";
  14. }

(2) 注册中心

这里使用ZooKeeper来实现注册中心, 将服务及服务提供者地址注册到注册中心.

  1. /**
  2. * @author litianxiang
  3. * @date 2020/3/17 11:28
  4. */
  5. public class RegisterCenter {
  6. private static Logger logger = LoggerFactory.getLogger(RegisterCenter.class);
  7. private ZooKeeper zk;
  8. /**
  9. * 连接ZooKeeper, 创建dubbo根节点
  10. */
  11. public RegisterCenter() {
  12. try {
  13. CountDownLatch connectedSignal = new CountDownLatch(1);
  14. zk = new ZooKeeper(ZooKeeperConst.host, 5000, new Watcher() {
  15. @Override
  16. public void process(WatchedEvent event) {
  17. if (event.getState() == Event.KeeperState.SyncConnected) {
  18. connectedSignal.countDown();
  19. }
  20. }
  21. });
  22. //因为监听器是异步操作, 要保证监听器操作先完成, 即要确保先连接上ZooKeeper再返回实例.
  23. connectedSignal.await();
  24. //创建dubbo注册中心的根节点(持久节点)
  25. if (zk.exists(ZooKeeperConst.rootNode, false) == null) {
  26. zk.create(ZooKeeperConst.rootNode, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  27. }
  28. } catch (Exception e) {
  29. logger.error("connect zookeeper server error.", e);
  30. }
  31. }
  32. /**
  33. * 将服务和服务提供者URL注册到注册中心
  34. * @param serviceName 服务名称
  35. * @param serviceProviderAddr 服务所在TCP地址
  36. */
  37. public void register(String serviceName, String serviceProviderAddr) {
  38. try {
  39. //创建服务节点
  40. String servicePath = ZooKeeperConst.rootNode + "/" + serviceName;
  41. if (zk.exists(servicePath, false) == null) {
  42. zk.create(servicePath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  43. }
  44. //创建服务提供者节点
  45. String serviceProviderPath = servicePath + "/" + serviceProviderAddr;
  46. if (zk.exists(serviceProviderPath, false) == null) {
  47. zk.create(serviceProviderPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
  48. }
  49. logger.info("服务注册成功, 服务路径: " + serviceProviderPath);
  50. } catch (Exception e) {
  51. logger.error("注册中心-注册服务报错", e);
  52. }
  53. }
  54. }

(3) 接口全限定类名, 方法名, 方法参数类型, 方法参数的包装类

这里为了简单, 使用Java自带的序列化协议.

  1. /**
  2. * 封装接口名, 方法名, 参数字节码数组, 参数对象
  3. */
  4. public class Invocation implements Serializable {
  5. private static final long serialVersionUID = -2798340582119604989L;
  6. /**
  7. * 接口名
  8. */
  9. private String interfaceName;
  10. /**
  11. * 方法名
  12. */
  13. private String methodName;
  14. /**
  15. * 参数字节码数组
  16. */
  17. private Class[] paramTypes;
  18. /**
  19. * 参数对象
  20. */
  21. private Object[] params;
  22. public Invocation(String interfaceName, String methodName, Class[] paramTypes, Object[] params) {
  23. this.interfaceName = interfaceName;
  24. this.methodName = methodName;
  25. this.paramTypes = paramTypes;
  26. this.params = params;
  27. }
  28. public String getInterfaceName() {
  29. return interfaceName;
  30. }
  31. public void setInterfaceName(String interfaceName) {
  32. this.interfaceName = interfaceName;
  33. }
  34. public String getMethodName() {
  35. return methodName;
  36. }
  37. public void setMethodName(String methodName) {
  38. this.methodName = methodName;
  39. }
  40. public Class[] getParamTypes() {
  41. return paramTypes;
  42. }
  43. public void setParamTypes(Class[] paramTypes) {
  44. this.paramTypes = paramTypes;
  45. }
  46. public Object[] getParams() {
  47. return params;
  48. }
  49. public void setParams(Object[] params) {
  50. this.params = params;
  51. }
  52. }

(4) RPC监听服务

用来监听Consumer远程调用的TCP连接, 接收到Consumer传输过来的数据后, 通过反射调用对应的方法, 然后将结果返回给Consumer. Dubbo使用的是Netty框架, 这里为了简单, 我们使用原生的TCP连接.

  1. /**
  2. * RPC监听服务, 监听consumer远程调用的tcp连接
  3. * @author litianxiang
  4. * @date 2020/3/17 18:01
  5. */
  6. public class RpcServer {
  7. private static Logger logger = LoggerFactory.getLogger(RpcServer.class);
  8. private Map<String, Class> serviceMap;
  9. public RpcServer(Map<String, Class> serviceMap) {
  10. this.serviceMap = serviceMap;
  11. }
  12. /**
  13. * 启动RPC监听服务
  14. */
  15. public void start() {
  16. //监听端口, 处理rpc请求
  17. ServerSocket serverSocket = null;
  18. try {
  19. serverSocket = new ServerSocket(12000);
  20. logger.info("RPC监听服务启动...");
  21. while (true) {
  22. Socket socket = serverSocket.accept();
  23. new Thread(new ServerHandler(socket, serviceMap)).start();
  24. }
  25. } catch (Exception e) {
  26. e.printStackTrace();
  27. } finally {
  28. if (serverSocket != null) {
  29. try {
  30. serverSocket.close();
  31. } catch (IOException e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. }
  36. }
  37. }
  38. /**
  39. * 处理RPC, 通过反射执行方法
  40. * @author litianxiang
  41. * @date 2020/3/6 17:52
  42. */
  43. public class ServerHandler implements Runnable {
  44. private Socket socket;
  45. private Map<String, Class> serviceMap;
  46. public ServerHandler(Socket socket, Map<String, Class> serviceMap) {
  47. this.socket = socket;
  48. this.serviceMap = serviceMap;
  49. }
  50. @Override
  51. public void run() {
  52. ObjectInputStream in = null;
  53. ObjectOutputStream out = null;
  54. try {
  55. in = new ObjectInputStream(socket.getInputStream());
  56. out = new ObjectOutputStream(socket.getOutputStream());
  57. //获取Invocation对象
  58. Invocation invocation = (Invocation) in.readObject();
  59. //执行对应方法
  60. Class clazz = serviceMap.get(invocation.getInterfaceName());
  61. Method method = clazz.getMethod(invocation.getMethodName(), invocation.getParamTypes());
  62. Object invoke = method.invoke(clazz.newInstance(), invocation.getParams());
  63. //返回方法执行结果
  64. out.writeObject(invoke);
  65. out.flush();
  66. } catch (Exception e) {
  67. e.printStackTrace();
  68. } finally {
  69. if (in != null) {
  70. try {
  71. in.close();
  72. } catch (IOException e) {
  73. e.printStackTrace();
  74. }
  75. }
  76. if (out != null) {
  77. try {
  78. out.close();
  79. } catch (Exception e) {
  80. e.printStackTrace();
  81. }
  82. }
  83. if (socket != null) {
  84. try {
  85. socket.close();
  86. } catch (IOException e) {
  87. e.printStackTrace();
  88. }
  89. }
  90. socket = null;
  91. }
  92. }
  93. }

(5) Provider启动类

这里会模拟dubbo的service配置, 将接口名及其对应的实现类储存到serviceMap中, 然后将服务和服务提供者地址注册到注册中心, 最后再启动对Consumer远程调用的监听.

  1. /**
  2. * @author litianxiang
  3. * @date 2020/3/6 15:32
  4. */
  5. public class Provider {
  6. private static Logger logger = LoggerFactory.getLogger(Provider.class);
  7. private static Map<String, Class> serviceMap = new HashMap<>();
  8. private static String tcpHost = "127.0.0.1:12000";
  9. static {
  10. /**
  11. * 模拟service配置处理逻辑
  12. * <dubbo:service interface="com.client.service.IBookService" ref="bookService" />
  13. * <bean id="bookService" class="com.provider.service.BookServiceImpl" />
  14. */
  15. serviceMap.put(IBookService.class.getName(), BookServiceImpl.class);
  16. }
  17. public static void main(String[] args) {
  18. //将服务和服务提供者URL注册到注册中心
  19. RegisterCenter registerCenter = new RegisterCenter();
  20. for (Map.Entry<String, Class> entry : serviceMap.entrySet()) {
  21. registerCenter.register(entry.getKey(), tcpHost);
  22. }
  23. //监听Consumer的远程调用(为了简化代码, 这里使用TCP代替Netty)
  24. RpcServer rpcServer = new RpcServer(serviceMap);
  25. rpcServer.start();
  26. }
  27. }

服务消费者

(1) 负载均衡

为了简单, 这里直接使用的是随机算法.

  1. public class RandomLoadBalance {
  2. /**
  3. * 随机一个provider
  4. * @param providerList provider列表
  5. * @return provider
  6. */
  7. public String doSelect(List<String> providerList) {
  8. int size = providerList.size();
  9. Random random = new Random();
  10. return providerList.get(random.nextInt(size));
  11. }
  12. }

(2) 服务订阅类

服务订阅类提供向注册中心订阅服务的功能, 涉及服务发现与负载均衡.

  1. public class ServiceSubscribe {
  2. private static Logger logger = LoggerFactory.getLogger(ServiceSubscribe.class);
  3. private ZooKeeper zk;
  4. private List<String> providerList;
  5. /**
  6. * 连接ZooKeeper, 创建dubbo根节点
  7. */
  8. public ServiceSubscribe() {
  9. try {
  10. CountDownLatch connectedSignal = new CountDownLatch(1);
  11. zk = new ZooKeeper(ZooKeeperConst.host, 5000, new Watcher() {
  12. @Override
  13. public void process(WatchedEvent event) {
  14. if (event.getState() == Event.KeeperState.SyncConnected) {
  15. connectedSignal.countDown();
  16. }
  17. }
  18. });
  19. //因为监听器是异步操作, 要保证监听器操作先完成, 即要确保先连接上ZooKeeper再返回实例.
  20. connectedSignal.await();
  21. //创建dubbo注册中心的根节点(持久节点)
  22. if (zk.exists(ZooKeeperConst.rootNode, false) == null) {
  23. zk.create(ZooKeeperConst.rootNode, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  24. }
  25. } catch (Exception e) {
  26. logger.error("connect zookeeper server error.", e);
  27. }
  28. }
  29. /**
  30. * 在注册中心订阅服务, 返回对应的服务url
  31. * 只要第一次获取到了服务的RPC地址, 后面注册中心挂掉之后, 仍然可以继续通信.
  32. * @param serviceName 服务名称
  33. * @return 服务host
  34. */
  35. public String subscribe(String serviceName) {
  36. //服务节点路径
  37. String servicePath = ZooKeeperConst.rootNode + "/" + serviceName;
  38. try {
  39. //获取服务节点下的所有子节点, 即服务的RPC地址
  40. providerList = zk.getChildren(servicePath, new Watcher() {
  41. @Override
  42. public void process(WatchedEvent event) {
  43. if (event.getType() == Event.EventType.NodeChildrenChanged) {
  44. try {
  45. //循环监听
  46. providerList = zk.getChildren(servicePath, true);
  47. } catch (KeeperException | InterruptedException e) {
  48. logger.error("Consumer在ZooKeeper订阅服务-注册监听器报错", e);
  49. }
  50. }
  51. }
  52. });
  53. } catch (Exception e) {
  54. logger.error("从注册中心获取服务报错.", e);
  55. }
  56. logger.info(serviceName + "的服务提供者列表: " + providerList);
  57. //负载均衡
  58. RandomLoadBalance randomLoadBalance = new RandomLoadBalance();
  59. return randomLoadBalance.doSelect(providerList);
  60. }
  61. }

(3) RPC代理类

根据JDK Proxy生成一个代理对象, 封装RPC调用的过程.

  1. public class RpcServiceProxy {
  2. private ServiceSubscribe serviceSubscribe;
  3. public RpcServiceProxy(ServiceSubscribe serviceSubscribe) {
  4. this.serviceSubscribe = serviceSubscribe;
  5. }
  6. /**
  7. * 获取RPC代理
  8. * @param clazz
  9. * @return
  10. */
  11. public Object getProxy(final Class clazz) {
  12. return Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
  13. @Override
  14. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  15. //在注册中心订阅服务, 返回对应的服务url
  16. String rpcHost = serviceSubscribe.subscribe(clazz.getName());
  17. String[] split = rpcHost.split(":");
  18. //与远程服务建立连接
  19. Socket socket = new Socket(split[0], Integer.parseInt(split[1]));
  20. ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
  21. ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
  22. //向RPC服务传输Invocation对象
  23. String className = clazz.getName();
  24. String methodName = method.getName();
  25. Class[] paramTypes = method.getParameterTypes();
  26. Invocation invocation = new Invocation(className, methodName, paramTypes, args);
  27. out.writeObject(invocation);
  28. out.flush();
  29. //接收方法执行结果
  30. Object object = in.readObject();
  31. in.close();
  32. out.close();
  33. socket.close();
  34. return object;
  35. }
  36. });
  37. }
  38. }

(4) Consumer启动类

消费者启动后, 会向注册中心订阅服务, 经过负载均衡获取到对应的服务后, 再进行RPC调用.

  1. public class Consumer {
  2. private static Logger logger = LoggerFactory.getLogger(Consumer.class);
  3. public static void main(String[] args) {
  4. //在注册中心订阅服务, 获取服务所在的url, 然后通过代理远程调用服务
  5. ServiceSubscribe serviceSubscribe = new ServiceSubscribe();
  6. RpcServiceProxy rpcServiceProxy = new RpcServiceProxy(serviceSubscribe);
  7. //获取RPC代理
  8. IBookService bookService = (IBookService) rpcServiceProxy.getProxy(IBookService.class);
  9. BookDTO bookInfo = bookService.getBookInfo(1);
  10. System.out.println(bookInfo);
  11. }
  12. }

测试

(1) 先修改注册中心的地址

  1. public static String host = "xxx.xx.xx.xxx:2181";

(2) Service

  1. public class BookDTO implements Serializable{
  2. private static final long serialVersionUID = 1934175717377394706L;
  3. private int id;
  4. private String name;
  5. private String desc;
  6. private String author;
  7. public int getId() {
  8. return id;
  9. }
  10. public void setId(int id) {
  11. this.id = id;
  12. }
  13. public String getName() {
  14. return name;
  15. }
  16. public void setName(String name) {
  17. this.name = name;
  18. }
  19. public String getDesc() {
  20. return desc;
  21. }
  22. public void setDesc(String desc) {
  23. this.desc = desc;
  24. }
  25. public String getAuthor() {
  26. return author;
  27. }
  28. public void setAuthor(String author) {
  29. this.author = author;
  30. }
  31. @Override
  32. public String toString() {
  33. return "BookDTO{" +
  34. "id=" + id +
  35. ", name='" + name + '\'' +
  36. ", desc='" + desc + '\'' +
  37. ", author='" + author + '\'' +
  38. '}';
  39. }
  40. }
  41. public interface IBookService {
  42. BookDTO getBookInfo(int id);
  43. }
  44. public class BookServiceImpl implements IBookService {
  45. @Override
  46. public BookDTO getBookInfo(int id) {
  47. if (id == 1) {
  48. BookDTO bookDTO = new BookDTO();
  49. bookDTO.setId(1);
  50. bookDTO.setName("仙逆");
  51. bookDTO.setDesc("顺为凡, 逆为仙, 只在心中一念间.");
  52. bookDTO.setAuthor("耳根");
  53. return bookDTO;
  54. } else {
  55. return new BookDTO();
  56. }
  57. }
  58. }

(3) 启动Provider

在这里插入图片描述

(4) 启动Consumer

在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,95人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Javadubbo框架

    前言 在对dubbo有了较为深入的使用和理解后,来尝试从dubbo框架的角度重新认识下它,对照着dubbo官方的这张图进行反复的理解后,我们可以从已有掌握的技术出发,来尝