Eureka Server源码解析(服务注册流程)

爱被打了一巴掌 2022-09-08 12:55 301阅读 0赞

原创不易,转载请注明出处

文章目录

      • 前言
      • 1.Eureka Server 注册表详解
      • 2.服务注册源码
        • 2.1入口
        • 2.2 注册表注册
          • 2.2.1 更新注册表实例信息
          • 2.2.2 放入最近改变队列中
          • 2.2.3 失效本地缓存
        1. 注册流程图

前言

本文主要解析下Eureka Server 处理服务注册流程源码,Eureka Server处理服务注册个人认为是2部分,1是将服务实例信息注册到本机的注册表中(其实就是个map数据结构),2是同步给其他Eureka Server 节点,本文主要关注第一部分的流程,节点间的数据同步流程会另起文章介绍。

1.Eureka Server 注册表详解

Eureka Server服务启动的时候,会创建一个注册表对象,也就是PeerAwareInstanceRegistryImpl ,在Spring Cloud 封装的Eureka Server中是InstanceRegistry (继承的PeerAwareInstanceRegistryImpl)。
它里面有个核心的数据结构。

  1. private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
  2. = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();

这个 registry map就是存放我们服务实例的地方,它的key 是String 类型,也就是我们的appName, value是个Map<String, Lease<InstanceInfo>>,里面这个map也是个ConcurrentHashMap 。里面这个map的key是个id,instanceId,value :Lease
维护的是服务的租约信息。

我们来看下Lease 里面都有啥

  1. private T holder;
  2. private long evictionTimestamp;
  3. private long registrationTimestamp;
  4. private long serviceUpTimestamp;
  5. // Make it volatile so that the expiration task would see this quicker
  6. private volatile long lastUpdateTimestamp;
  7. private long duration;

这里挑几个重点的介绍

  1. holder : 是你注册的实例信息,host,ip,port等等一些信息
  2. lastUpdateTimestamp :最后一次续约的信息。

所谓的续约就是心跳,比如说你一个服务注册到eureka server 上面,你会默认每隔30s时间来续约一次,告诉一下eureka server我这个服务实例还活着,如果你长时间没有向eureka server 进行服务续约,eureka server就会从注册表中将这个服务实例信息剔除。

  1. serviceUpTimestamp :服务启动时间,这个是服务注册的时候传输过来的。
  2. duration :可以认为是租期,默认是90s。如果你客户端有自己的租期,就用客户端带过来的。

2.服务注册源码

2.1入口

Eureka Server使用的web框架是jersey(这玩意定位就是restful 框架,跟springmvc 差不多,不需要太多的关注)
接收服务注册com.netflix.eureka.resources.ApplicationResource 的addInstance 方法。

  1. private final PeerAwareInstanceRegistry registry;
  2. @POST
  3. @Consumes({ "application/json", "application/xml"})
  4. public Response addInstance(InstanceInfo info,
  5. @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
  6. // 此处省略一堆代码
  7. registry.register(info, "true".equals(isReplication));
  8. return Response.status(204).build(); // 204 to be backwards compatible
  9. }

可以看到调用了注册表PeerAwareInstanceRegistry 的register方法。

2.2 注册表注册

PeerAwareInstanceRegistry#register

  1. @Override
  2. public void register(final InstanceInfo info, final boolean isReplication) {
  3. //默认90 持续时间 也就是90s的过期时间
  4. int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
  5. // 如果客户端存在过期时间的话,就是用客户端的
  6. if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
  7. leaseDuration = info.getLeaseInfo().getDurationInSecs();
  8. }
  9. // 1.调用父类AbstractInstanceRegistry 的注册方法进行注册,更新本地注册表
  10. super.register(info, leaseDuration, isReplication);
  11. // 2.集群间节点同步
  12. replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
  13. }

先是看看客户端有没有带过来过期时间,如果没有带过来就是用Eureka Server 默认的过期时间90s,表示90s后没有续约的话服务实例信息可能会被从注册表中剔除(为啥是可能会? 这个与服务剔除机制有关,它会有一定的延时)
接着就是调用父类的register方法,进行服务注册,其实就是更新本地服务注册表。
最后调用replicateToPeers 方法将服务信息同步给集群的其他节点。
这里我们最关心就是 super.register(info, leaseDuration, isReplication); 这行代码,也就是调用父类(AbstractInstanceRegistry)的register方法进行服务注册本地注册表的变更。

AbstractInstanceRegistry#register

  1. public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
  2. // 获取读锁
  3. read.lock();
  4. try {
  5. // ----------------------------------1-----------------------------------------------------
  6. // 先从注册表中 获取对应appName的 map集合
  7. Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
  8. REGISTER.increment(isReplication);
  9. // 不存在就创建 并且塞入注册表map中
  10. if (gMap == null) {
  11. final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
  12. //存在就不会put ,并将存在的那个返回来,不存在的话进行put,返回null
  13. // 试想该地使用putIfAbsent 的作用: double check, 虽然使用了ConcurrentHashMap,
  14. // 保证了元素增删改查没问题,但是还是不能保证 gNewMap 只存在一个。
  15. gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
  16. if (gMap == null) {
  17. gMap = gNewMap;
  18. }
  19. }
  20. // 获取该实例id 对应的租约信息
  21. Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
  22. // 如果是存在
  23. if (existingLease != null && (existingLease.getHolder() != null)) {
  24. Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
  25. Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
  26. logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
  27. if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
  28. registrant = existingLease.getHolder();
  29. }
  30. // 如不存在的话
  31. } else {
  32. // The lease does not exist and hence it is a new registration
  33. synchronized (lock) {
  34. // 这个是与自我保护机制有关的
  35. if (this.expectedNumberOfClientsSendingRenews > 0) {
  36. // 每有一个新的客户端注册进来,就会+1,表示未来要发送心跳的客户端+1
  37. // Since the client wants to register it, increase the number of clients sending renews
  38. this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
  39. // 更新下 自己能容忍的最少心跳数量
  40. updateRenewsPerMinThreshold();
  41. }
  42. }
  43. logger.debug("No previous lease information found; it is new registration");
  44. }
  45. Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
  46. if (existingLease != null) {
  47. // 服务启动时间ServiceUp
  48. lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
  49. }
  50. // 塞到注册表中
  51. gMap.put(registrant.getId(), lease);
  52. //放到registed 队列中
  53. recentRegisteredQueue.add(new Pair<Long, String>(
  54. System.currentTimeMillis(),
  55. registrant.getAppName() + "(" + registrant.getId() + ")"));
  56. // This is where the initial state transfer of overridden status happens
  57. if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
  58. logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
  59. + "overrides", registrant.getOverriddenStatus(), registrant.getId());
  60. if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
  61. logger.info("Not found overridden id {} and hence adding it", registrant.getId());
  62. overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
  63. }
  64. }
  65. // 从本地缓存中获取该实例状态
  66. InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
  67. if (overriddenStatusFromMap != null) {
  68. // 存在的话,设置进去
  69. logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
  70. registrant.setOverriddenStatus(overriddenStatusFromMap);
  71. }
  72. // Set the status based on the overridden status rules
  73. InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
  74. registrant.setStatusWithoutDirty(overriddenInstanceStatus);
  75. // If the lease is registered with UP status, set lease service up timestamp
  76. // 实例状态是up的话,设置下服务启动时间
  77. if (InstanceStatus.UP.equals(registrant.getStatus())) {
  78. lease.serviceUp();
  79. }
  80. registrant.setActionType(ActionType.ADDED);
  81. //------------2 ----------------------
  82. // 塞入最近改变队列中
  83. recentlyChangedQueue.add(new RecentlyChangedItem(lease));
  84. // 更新最后更新时间
  85. registrant.setLastUpdatedTimestamp();
  86. //------------3------------------------
  87. // 本地缓存失效
  88. invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
  89. } finally {
  90. read.unlock();
  91. }
  92. }

这个注册方法很长,我按照重要程度大概分为3步。需要注意的是整个方法被 读锁 包起来了。

  1. 更新注册表信息。
  2. 将信息塞入最近变更队列中, 更新最后更新时间
  3. 本地缓存失效(本地缓存很重要,服务发现的时候先会去本地的一级二级缓存中获取,没有的话采取注册表中找)
2.2.1 更新注册表实例信息
  1. // 先从注册表中 获取对应appName的 map集合
  2. Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
  3. REGISTER.increment(isReplication);
  4. // 不存在就创建 并且塞入注册表map中
  5. if (gMap == null) {
  6. final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
  7. //存在就不会put ,并将存在的那个返回来,不存在的话进行put,返回null
  8. // 试想该地使用putIfAbsent 的作用: double check, 虽然使用了ConcurrentHashMap,
  9. // 保证了元素增删改查没问题,但是还是不能保证 gNewMap 只存在一个。
  10. gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
  11. if (gMap == null) {
  12. gMap = gNewMap;
  13. }
  14. }

先是从注册表(也就是第一节介绍的那个注册表map )根据appName 获取对应 实例列表map,比如说我有一个叫good-center的服务,它起了很多的实例,集群部署的,这个时候它里面的那个map就维护了所有注册上来的实例续约信息(如图:当然这个实例id :192.168.0.3:8080是我自己编的,在spring cloud 中使用可以好好设计下,比如说:application name+ ip+ port+项目version 等等)
在这里插入图片描述
这里这段代码就是先根据appName 获取下 所有实例map,如果不存在的话,就创建。

  1. // 获取该实例id 对应的租约信息
  2. Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
  3. // 如果是存在
  4. if (existingLease != null && (existingLease.getHolder() != null)) {
  5. Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
  6. Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
  7. if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
  8. logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
  9. " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
  10. logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
  11. registrant = existingLease.getHolder();
  12. }
  13. // 如不存在的话
  14. } else {
  15. // The lease does not exist and hence it is a new registration
  16. synchronized (lock) {
  17. // 这个是与自我保护机制有关的
  18. if (this.expectedNumberOfClientsSendingRenews > 0) {
  19. // 每有一个新的客户端注册进来,就会+1,表示未来要发送心跳的客户端+1
  20. // Since the client wants to register it, increase the number of clients sending renews
  21. this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
  22. // 更新下 自己能容忍的最少心跳数量
  23. updateRenewsPerMinThreshold();
  24. }
  25. }
  26. logger.debug("No previous lease information found; it is new registration");
  27. }

接着就是根据实例id获取下实例租约信息,如果之前就在注册表中存在的话,看看要不要使用之前的instance信息,前提是没有变更过,通过变更时间来判断的。
如果是第一次注册(或者是之前注册过,然后服务下线了)的话,这里面肯定是没有的,就会将expectedNumberOfClientsSendingRenews 自加1 ,expectedNumberOfClientsSendingRenews 这个变量表示的是需要续约的客户端数量。最后调用updateRenewsPerMinThreshold 方法,更新一下下次接收续约(心跳)最少的阈值(这个是与 服务剔除、自我保护机制有关的,可以看下《深度解析Eureka的自我保护机制》文章,由于eureka 版本不一样,代码实现可能不一样,但是核心思想是不变的)

  1. Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
  2. if (existingLease != null) {
  3. // 服务启动时间ServiceUp
  4. lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
  5. }
  6. // 塞到注册表中
  7. gMap.put(registrant.getId(), lease);

重新生成租约信息,如果是注册表中已经存在这个实例,就将原来的实例启动时间 设置到新的租约信息中,然后存储到注册表中。

2.2.2 放入最近改变队列中
  1. // 塞入最近改变队列中
  2. recentlyChangedQueue.add(new RecentlyChangedItem(lease));
  3. // 更新最后更新时间
  4. registrant.setLastUpdatedTimestamp();

这个放入最近改变队列中,为了后续使用,就放3分钟,到期后就从队列中剔除了。
更新最后更新时间,这个也是非常重要的,后续要根据这个时间进行服务剔除。

2.2.3 失效本地缓存
  1. // 本地缓存失效
  2. invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());

这块我们知道就可以了,这个本地缓存服务发现的时候会用到,会先从本地的多级缓存中获取这些实例信息。这里实例信息改变了,就需要清除下本地的缓存,保证一致性。

3. 注册流程图

在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,301人围观)

还没有评论,来说两句吧...

相关阅读