从零写分布式RPC框架 系列 2.0 (3)RPC-Server和RPC-Client模块改造

我就是我 2022-04-14 03:11 484阅读 0赞

2.0版本RPC-Server改动不大,主要变化在于RPC-Client使用了服务地址缓存,并引入监控机制,第一时间获取zk集群中服务地址信息变化并刷新本地缓存。另外,RPC-Client还使用了RpcClientProperties开放对负载均衡策略和序列化策略的选择。

系列文章:

专栏:从零开始写分布式RPC框架
项目GitHub地址:https://github.com/linshenkx/rpc-netty-spring-boot-starter

手写通用类型负载均衡路由引擎(含随机、轮询、哈希等及其带权形式)
实现 序列化引擎(支持 JDK默认、Hessian、Json、Protostuff、Xml、Avro、ProtocolBuffer、Thrift等序列化方式)
从零写分布式RPC框架 系列 2.0 (1)架构升级
从零写分布式RPC框架 系列 2.0 (2)RPC-Common模块设计实现
从零写分布式RPC框架 系列 2.0 (3)RPC-Server和RPC-Client模块改造
从零写分布式RPC框架 系列 2.0 (4)使用BeanPostProcessor实现自定义@RpcReference注解注入

文章目录

    • 系列文章:
    • RPC-Server
      • 1 结构图
      • 2 RpcService注解
      • 3 ServiceInfo
      • 4 RpcServer
    • RPC-Client
      • 1 结构图
      • 2 RpcClientProperties
      • 3 ZKServiceDiscovery
      • 4 RpcClient

RPC-Server

1 结构图

结构图注意,RpcService注解移动到了RPC-Common模块下,另外新加了ServiceInfo代表将存到注册中心的服务信息(也在RPC-Common模块下),其他的除了RpcServer基本没有变化

2 RpcService注解

主要是多了weight和workerThreads,分别代表权重和最大工作线程数。

  1. /** * @version V1.0 * @author: lin_shen * @date: 2018/10/31 * @Description: * RPC服务注解(标注在rpc服务实现类上) * 使用@Service注解使被@RpcService标注的类都能被Spring管理 */
  2. @Target({ ElementType.TYPE})
  3. @Retention(RetentionPolicy.RUNTIME)
  4. @Service
  5. public @interface RpcService {
  6. Class<?> value();
  7. int weight() default 1;
  8. int workerThreads() default 10;
  9. }

3 ServiceInfo

  1. /** * @version V1.0 * @author: lin_shen * @date: 18-11-13 * @Description: 服务信息,用于存储到注册中心 */
  2. @Data
  3. @AllArgsConstructor
  4. @NoArgsConstructor
  5. public class ServiceInfo implements WeightGetAble {
  6. private String host;
  7. private int port;
  8. /** * 权重信息 */
  9. private int weight;
  10. /** * 最大工作线程数 */
  11. private int workerThreads;
  12. public ServiceInfo (ServiceInfo serviceInfo){
  13. this.host = serviceInfo.host;
  14. this.port = serviceInfo.port;
  15. this.weight = serviceInfo.weight;
  16. this.workerThreads = serviceInfo.workerThreads;
  17. }
  18. @Override
  19. public int getWeightFactors() {
  20. return getWeight();
  21. }
  22. }

4 RpcServer

RpcServer主要是多了对 serviceSemaphoreMap 和 serviceRpcServiceMap的管理。其中serviceSemaphoreMap 将作为参数传入RpcServerHandler提供限流信息,而serviceRpcServiceMap将注册到ZK集群。

  1. /** * @version V1.0 * @author: lin_shen * @date: 2018/10/31 * @Description: TODO */
  2. @Log4j2
  3. @AutoConfigureAfter({ ZKServiceRegistry.class})
  4. @EnableConfigurationProperties(RpcServerProperties.class)
  5. public class RpcServer implements ApplicationContextAware, InitializingBean {
  6. /** * 存放 服务名称 与 服务实例 之间的映射关系 */
  7. private Map<String,Object> handlerMap=new HashMap<>();
  8. /** * 存放 服务名称 与 信号量 之间的映射关系 * 用于限制每个服务的工作线程数 */
  9. private Map<String, Semaphore> serviceSemaphoreMap=new HashMap<>();
  10. /** * 存放 服务名称 与 服务信息 之间的映射关系 */
  11. private Map<String, RpcService> serviceRpcServiceMap=new HashMap<>();
  12. @Autowired
  13. private RpcServerProperties rpcProperties;
  14. @Autowired
  15. private ZKServiceRegistry rpcServiceRegistry;
  16. /** * 在类初始化时执行,将所有被@RpcService标记的类纳入管理 * @param applicationContext * @throws BeansException */
  17. @Override
  18. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  19. //获取带有@RpcService注解的类
  20. Map<String,Object> rpcServiceMap=applicationContext.getBeansWithAnnotation(RpcService.class);
  21. //以@RpcService注解的value的类的类名为键将该标记类存入handlerMap和serviceSemaphoreMap
  22. if(!CollectionUtils.isEmpty(rpcServiceMap)){
  23. for(Object object:rpcServiceMap.values()){
  24. RpcService rpcService=object.getClass().getAnnotation(RpcService.class);
  25. String serviceName=rpcService.value().getName();
  26. handlerMap.put(serviceName,object);
  27. serviceSemaphoreMap.put(serviceName,new Semaphore(rpcService.workerThreads()));
  28. serviceRpcServiceMap.put(serviceName,rpcService);
  29. }
  30. }
  31. }
  32. /** * 在所有属性值设置完成后执行,负责启动RPC服务 * @throws Exception */
  33. @Override
  34. public void afterPropertiesSet() throws Exception {
  35. //管理相关childGroup
  36. EventLoopGroup bossGroup=new NioEventLoopGroup();
  37. //处理相关RPC请求
  38. EventLoopGroup childGroup=new NioEventLoopGroup();
  39. try {
  40. //启动RPC服务
  41. ServerBootstrap bootstrap=new ServerBootstrap();
  42. bootstrap.group(bossGroup,childGroup);
  43. bootstrap.channel(NioServerSocketChannel.class);
  44. bootstrap.option(ChannelOption.SO_BACKLOG,1024)
  45. .childOption(ChannelOption.TCP_NODELAY,true)
  46. .handler(new LoggingHandler(LogLevel.INFO));
  47. bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
  48. @Override
  49. protected void initChannel(SocketChannel channel) throws Exception {
  50. ChannelPipeline pipeline=channel.pipeline();
  51. //解码RPC请求
  52. pipeline.addLast(new RemotingTransporterDecoder());
  53. //编码RPC请求
  54. pipeline.addFirst(new RemotingTransporterEncoder());
  55. //处理RPC请求
  56. pipeline.addLast(new RpcServerHandler(handlerMap,serviceSemaphoreMap));
  57. }
  58. });
  59. //同步启动,RPC服务器启动完毕后才执行后续代码
  60. ChannelFuture future=bootstrap.bind(rpcProperties.getPort()).sync();
  61. log.info("server started,listening on {}",rpcProperties.getPort());
  62. //启动后注册服务
  63. registry();
  64. //释放资源
  65. future.channel().closeFuture().sync();
  66. }catch (Exception e){
  67. log.entry("server exception",e);
  68. }finally {
  69. //关闭RPC服务
  70. childGroup.shutdownGracefully();
  71. bossGroup.shutdownGracefully();
  72. }
  73. }
  74. private void registry() throws UnknownHostException {
  75. //注册RPC服务地址
  76. String hostAddress=InetAddress.getLocalHost().getHostAddress();
  77. int port=rpcProperties.getPort();
  78. for(String interfaceName:handlerMap.keySet()){
  79. ServiceInfo serviceInfo=
  80. new ServiceInfo(hostAddress,port,serviceRpcServiceMap.get(interfaceName).weight(),serviceRpcServiceMap.get(interfaceName).workerThreads());
  81. String serviceInfoString= JSON.toJSONString(serviceInfo);
  82. rpcServiceRegistry.register(interfaceName,serviceInfoString);
  83. log.info("register service:{}=>{}",interfaceName,serviceInfoString);
  84. }
  85. }
  86. }

RPC-Client

1 结构图

RPC-Client
新增RpcClientProperties提供配置属性读入(路由策略和序列化方式),ZKServiceDiscovery增加ConcurrentMap servicePathsMap来管理服务地址列表。RpcClient相应作出调整。

2 RpcClientProperties

注意这里属性用的是枚举类型而不是字符串,另外默认路由策略是随机,默认序列化策略是json

  1. @Data
  2. @ConfigurationProperties(prefix = "rpc.client")
  3. public class RpcClientProperties {
  4. private RouteStrategyEnum routeStrategy= RouteStrategyEnum.Random;
  5. private SerializeTypeEnum serializeType=SerializeTypeEnum.JSON;
  6. }

3 ZKServiceDiscovery

这里使用了IZkChildListener 来对目标路径下子节点变化进行监控,如果发生变化(新增或删减)则重新执行discover方法拉取最新服务地址列表。
zkChildListenerMap的作用是管理服务和对应的服务地址列表监听器,避免重复注册监听器。

  1. /** * @version V1.0 * @author: lin_shen * @date: 2018/10/31 * @Description: zookeeper服务注册中心 */
  2. @Component
  3. @Log4j2
  4. @EnableConfigurationProperties(ZKProperties.class)
  5. public class ZKServiceDiscovery {
  6. @Autowired
  7. private ZKProperties zkProperties;
  8. /** * 服务名和服务地址列表的Map */
  9. private ConcurrentMap<String,List<String>> servicePathsMap=new ConcurrentHashMap<>();
  10. /** * 服务监听器 Map,监听子节点服务信息 */
  11. private ConcurrentMap<String, IZkChildListener> zkChildListenerMap=new ConcurrentHashMap<>();
  12. private ZkClient zkClient;
  13. @PostConstruct
  14. public void init() {
  15. // 创建 ZooKeeper 客户端
  16. zkClient = new ZkClient(zkProperties.getAddress(), zkProperties.getSessionTimeOut(), zkProperties.getConnectTimeOut());
  17. log.info("connect to zookeeper");
  18. }
  19. /** * * 根据服务名获取服务地址并保持监控 * @param serviceName * @return */
  20. public void discover(String serviceName){
  21. log.info("discovering:"+serviceName);
  22. String servicePath=zkProperties.getRegistryPath()+"/"+serviceName;
  23. //找不到对应服务
  24. if(!zkClient.exists(servicePath)){
  25. throw new RuntimeException("can not find any service node on path: "+servicePath);
  26. }
  27. //获取服务地址列表
  28. List<String> addressList=zkClient.getChildren(servicePath);
  29. if(CollectionUtils.isEmpty(addressList)){
  30. throw new RuntimeException("can not find any address node on path: "+servicePath);
  31. }
  32. //保存地址列表
  33. List<String> paths=new ArrayList<>(addressList.size());
  34. for(String address:addressList){
  35. paths.add(zkClient.readData(servicePath+"/"+address));
  36. }
  37. servicePathsMap.put(serviceName,paths);
  38. //保持监控
  39. if(!zkChildListenerMap.containsKey(serviceName)){
  40. IZkChildListener iZkChildListener= (parentPath, currentChilds) -> {
  41. //当子节点列表变化时重新discover
  42. discover(serviceName);
  43. log.info("子节点列表发生变化 ");
  44. };
  45. zkClient.subscribeChildChanges(servicePath, iZkChildListener);
  46. zkChildListenerMap.put(serviceName,iZkChildListener);
  47. }
  48. }
  49. public List<String> getAddressList(String serviceName){
  50. List<String> addressList=servicePathsMap.get(serviceName);
  51. if(addressList==null||addressList.isEmpty()){
  52. discover(serviceName);
  53. return servicePathsMap.get(serviceName);
  54. }
  55. return addressList;
  56. }
  57. }

4 RpcClient

主要是配合RemotingTransporter做了调整和升级,整体变化不大。
另外一个要注意的就是 ConcurrentMap serviceRouteStrategyMap,用于在使用轮询策略时,为不同的服务调用保管对应的轮询器(轮询器内部存储index记录,是有状态的)。

  1. @Log4j2
  2. @Component
  3. @AutoConfigureAfter(ZKServiceDiscovery.class)
  4. @EnableConfigurationProperties(RpcClientProperties.class)
  5. public class RpcClient {
  6. @Autowired
  7. private ZKServiceDiscovery zkServiceDiscovery;
  8. @Autowired
  9. private RpcClientProperties rpcClientProperties;
  10. /** * 维持服务的 轮询 路由状态 * 不同服务状态不同(服务列表也不同) * 非轮询无需维持状态 */
  11. private ConcurrentMap<String,RouteStrategy> serviceRouteStrategyMap=new ConcurrentHashMap<>();
  12. /** * 存放请求编号与响应对象的映射关系 */
  13. private ConcurrentMap<Long, RemotingTransporter> remotingTransporterMap=new ConcurrentHashMap<>();
  14. @SuppressWarnings("unchecked")
  15. public <T> T create(final Class<?> interfaceClass){
  16. //创建动态代理对象
  17. return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
  18. new Class<?>[]{ interfaceClass},
  19. (proxy, method, args) -> {
  20. //创建RPC请求对象
  21. RpcRequest rpcRequest=new RpcRequest();
  22. rpcRequest.setInterfaceName(method.getDeclaringClass().getName());
  23. rpcRequest.setMethodName(method.getName());
  24. rpcRequest.setParameterTypes(method.getParameterTypes());
  25. rpcRequest.setParameters(args);
  26. //获取RPC服务信息列表
  27. String serviceName=interfaceClass.getName();
  28. List<String> addressList=zkServiceDiscovery.getAddressList(serviceName);
  29. List<ServiceInfo> serviceInfoList=new ArrayList<>(addressList.size());
  30. for(String serviceInfoString:addressList){
  31. serviceInfoList.add(JSON.parseObject(serviceInfoString,ServiceInfo.class));
  32. }
  33. //根据配置文件获取路由策略
  34. log.info("使用负载均衡策略:"+rpcClientProperties.getRouteStrategy());
  35. log.info("使用序列化策略:"+rpcClientProperties.getSerializeType());
  36. RouteStrategy routeStrategy ;
  37. //如果使用轮询,则需要保存状态(按服务名保存)
  38. if(RouteStrategyEnum.Polling==rpcClientProperties.getRouteStrategy()){
  39. routeStrategy=serviceRouteStrategyMap.getOrDefault(serviceName,RouteEngine.queryClusterStrategy(RouteStrategyEnum.Polling));
  40. serviceRouteStrategyMap.put(serviceName,routeStrategy);
  41. }else {
  42. routeStrategy= RouteEngine.queryClusterStrategy(rpcClientProperties.getRouteStrategy());
  43. }
  44. //根据路由策略选取服务提供方
  45. ServiceInfo serviceInfo = routeStrategy.select(serviceInfoList);
  46. RemotingTransporter remotingTransporter=new RemotingTransporter();
  47. //设置flag为请求,双路,非ping,非其他,序列化方式为 配置文件中SerializeTypeEnum对应的code
  48. remotingTransporter.setFlag(new RemotingTransporter.Flag(true,true,false,false,rpcClientProperties.getSerializeType().getCode()));
  49. remotingTransporter.setBodyContent(rpcRequest);
  50. log.info("get serviceInfo:"+serviceInfo);
  51. //从RPC服务地址中解析主机名与端口号
  52. //发送RPC请求
  53. RpcResponse rpcResponse=send(remotingTransporter,serviceInfo.getHost(),serviceInfo.getPort());
  54. //获取响应结果
  55. if(rpcResponse==null){
  56. log.error("send request failure",new IllegalStateException("response is null"));
  57. return null;
  58. }
  59. if(rpcResponse.getException()!=null){
  60. log.error("response has exception",rpcResponse.getException());
  61. return null;
  62. }
  63. return rpcResponse.getResult();
  64. }
  65. );
  66. }
  67. private RpcResponse send(RemotingTransporter remotingTransporter,String host,int port){
  68. log.info("send begin: "+host+":"+port);
  69. //客户端线程为1即可
  70. EventLoopGroup group=new NioEventLoopGroup(1);
  71. try {
  72. //创建RPC连接
  73. Bootstrap bootstrap=new Bootstrap();
  74. bootstrap.group(group);
  75. bootstrap.channel(NioSocketChannel.class);
  76. bootstrap.handler(new ChannelInitializer<SocketChannel>() {
  77. @Override
  78. protected void initChannel(SocketChannel channel) throws Exception {
  79. ChannelPipeline pipeline=channel.pipeline();
  80. pipeline.addLast(new RemotingTransporterDecoder())
  81. .addFirst(new RemotingTransporterEncoder())
  82. .addLast(new RpcClientHandler(remotingTransporterMap));
  83. }
  84. });
  85. ChannelFuture future=bootstrap.connect(host,port).sync();
  86. Channel channel=future.channel();
  87. log.info("invokeId: "+remotingTransporter.getInvokeId());
  88. //写入RPC请求对象
  89. channel.writeAndFlush(remotingTransporter).sync();
  90. channel.closeFuture().sync();
  91. log.info("send end");
  92. //获取RPC响应对象
  93. return (RpcResponse) remotingTransporterMap.get(remotingTransporter.getInvokeId()).getBodyContent();
  94. }catch (Exception e){
  95. log.error("client exception",e);
  96. return null;
  97. }finally {
  98. group.shutdownGracefully();
  99. //移除请求编号和响应对象直接的映射关系
  100. remotingTransporterMap.remove(remotingTransporter.getInvokeId());
  101. }
  102. }
  103. }

发表评论

表情:
评论列表 (有 0 条评论,484人围观)

还没有评论,来说两句吧...

相关阅读