spring-cloud-eureka (三) 注册中心源码分析

野性酷女 2022-06-03 02:53 270阅读 0赞

Eureka是一个开源的服务治理框架,它提供了完成的Service Registry和Service Discovery实现,并且和Spring Cloud无缝集成,使用Spring Boot + Spring Cloud可以轻松的将注册中心搭建起来。

Eureka架构

基础架构

这里写图片描述
上图简单的描述了Eureka的基本结构,由3个角色组成:

  • Eureka Server: 维护服务信息,包括实例信息,提供服务治理基础功能的功能,如服务注册和服务发现。
  • Service Provider: 服务提供者,将自身提供的服务注册到Eureka Server,使服务消费者能够从Eureka Server中获取到。
  • Service Consumer: 服务消费者,从Eureka Server中获取注册服务列表,从而调用相关的服务。

上述三个角色都是抽象的逻辑角色,在实际运行中,这几个角色可以是同一个实例。

高可用架构

这里写图片描述

上图更进一步的展示了3个角色之间的交互。

  • 服务提供者向Eureka Server发送服务注册、服务续约、服务下线等操作。
  • Eureka Server 之间会进行注册服务的同步, 从而保证服务状态一致。
  • 服务消费者向Eureka Server获取注册服务列表,并调用服务。

源码分析

服务注册

Eureka Server会维护一个服务清单列表,这个列表是一个双层结构的Map对象,其中第一层的key是服务名,第二层的key是服务实例名。

我们从Eureka Server 的配置类org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration中可以找到, 发布的服务实例注册表是org.springframework.cloud.netflix.eureka.server.InstanceRegistry。

这里写图片描述

InstanceRegistry类继承了PeerAwareInstanceRegistryImpl类,所以它支持集群。
还有一个InstanceRegistry接口,它的完成路径是com.netflix.eureka.registry.InstanceRegistry。org.springframework.cloud.netflix.eureka.server.InstanceRegistry类是它的实现类。该接口从字面意思可以理解为实例注册表,它继承了LeaseManager接口和LookupService接口。

  • LookupService接口主要是查找正常运行的服务实例。
  • LeaseManager接口主要是维护可用服务清单的,它将服务的可能期限抽象为租约期限,该接口负责为一个实例的租约的创建、续约、和下线。

发布事件

org.springframework.cloud.netflix.eureka.server.InstanceRegistry类会帮服务注册、服务续约、服务下线操作发布一个相应的事件,然后调用父类的方法。

  1. ...
  2. public class InstanceRegistry extends PeerAwareInstanceRegistryImpl implements ApplicationContextAware {
  3. ...
  4. //服务注册
  5. @Override
  6. public void register(final InstanceInfo info, final boolean isReplication) {
  7. handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
  8. super.register(info, isReplication);
  9. }
  10. //服务下线
  11. @Override
  12. public boolean cancel(String appName, String serverId, boolean isReplication) {
  13. handleCancelation(appName, serverId, isReplication);
  14. return super.cancel(appName, serverId, isReplication);
  15. }
  16. //服务续约
  17. @Override
  18. public boolean renew(final String appName, final String serverId,
  19. boolean isReplication) {
  20. log("renew " + appName + " serverId " + serverId + ", isReplication {}"
  21. + isReplication);
  22. List<Application> applications = getSortedApplications();
  23. for (Application input : applications) {
  24. if (input.getName().equals(appName)) {
  25. InstanceInfo instance = null;
  26. for (InstanceInfo info : input.getInstances()) {
  27. if (info.getId().equals(serverId)) {
  28. instance = info;
  29. break;
  30. }
  31. }
  32. publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId,
  33. instance, isReplication));
  34. break;
  35. }
  36. }
  37. return super.renew(appName, serverId, isReplication);
  38. }
  39. ...
  40. }





















操作 事件
服务注册 EurekaInstanceRegisteredEvent
服务续约 EurekaInstanceRenewedEvent
服务下线 EurekaInstanceCanceledEvent

Eureka Server 集群同步

InstanceRegistry类继承了 PeerAwareInstanceRegistryImpl类,所以服务注册、续约、下线等操作完成后,会去调用PeerAwareInstanceRegistryImpl的相关逻辑。而PeerAwareInstanceRegistryImpl中主要是添加了一个广播的功能,拥有了将服务实例的注册、续约、下线等操作同步到其它Eureka Server的能力。

com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl

  1. package com.netflix.eureka.registry;
  2. ...
  3. public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
  4. ...
  5. @Override
  6. public boolean cancel(final String appName, final String id,
  7. final boolean isReplication) {
  8. //调用父类方法
  9. if (super.cancel(appName, id, isReplication)) {
  10. //发送广播
  11. replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
  12. ...
  13. return true;
  14. }
  15. return false;
  16. }
  17. ...
  18. @Override
  19. public void register(final InstanceInfo info, final boolean isReplication) {
  20. int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
  21. if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
  22. leaseDuration = info.getLeaseInfo().getDurationInSecs();
  23. }
  24. //调用父类方法
  25. super.register(info, leaseDuration, isReplication);
  26. //发送广播
  27. replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
  28. }
  29. ...
  30. public boolean renew(final String appName, final String id, final boolean isReplication) {
  31. //调用父类方法
  32. if (super.renew(appName, id, isReplication)) {
  33. //发送广播
  34. replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
  35. return true;
  36. }
  37. return false;
  38. }
  39. ...
  40. //发送同步消息的逻辑,如果是其它Eureka Server同步过来的,就返回不再发送同步消息。
  41. private void replicateToPeers(Action action, String appName, String id,
  42. InstanceInfo info /* optional */,
  43. InstanceStatus newStatus /* optional */, boolean isReplication) {
  44. Stopwatch tracer = action.getTimer().start();
  45. try {
  46. if (isReplication) {
  47. numberOfReplicationsLastMin.increment();
  48. }
  49. // If it is a replication already, do not replicate again as this will create a poison replication
  50. if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
  51. return;
  52. }
  53. for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
  54. // If the url represents this host, do not replicate to yourself.
  55. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
  56. continue;
  57. }
  58. replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
  59. }
  60. } finally {
  61. tracer.stop();
  62. }
  63. }
  64. //发送同步消息的操作, PeerEurekaNode.replicationClient最终调用的是JerseyReplicationClient
  65. private void replicateInstanceActionsToPeers(Action action, String appName,
  66. String id, InstanceInfo info, InstanceStatus newStatus,
  67. PeerEurekaNode node) {
  68. try {
  69. InstanceInfo infoFromRegistry = null;
  70. CurrentRequestVersion.set(Version.V2);
  71. switch (action) {
  72. case Cancel:
  73. node.cancel(appName, id);
  74. break;
  75. case Heartbeat:
  76. InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
  77. infoFromRegistry = getInstanceByAppAndId(appName, id, false);
  78. node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
  79. break;
  80. case Register:
  81. node.register(info);
  82. break;
  83. case StatusUpdate:
  84. infoFromRegistry = getInstanceByAppAndId(appName, id, false);
  85. node.statusUpdate(appName, id, newStatus, infoFromRegistry);
  86. break;
  87. case DeleteStatusOverride:
  88. infoFromRegistry = getInstanceByAppAndId(appName, id, false);
  89. node.deleteStatusOverride(appName, id, infoFromRegistry);
  90. break;
  91. }
  92. } catch (Throwable t) {
  93. logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
  94. }
  95. }
  96. ...
  97. }

在同步过程中,会通过PeerEurekaNode对象中的replicationClient字段发送消息,该字段是com.netflix.eureka.cluster.HttpReplicationClient接口的实现类,默认是com.netflix.eureka.transport.JerseyReplicationClient。

这里写图片描述

从上图中可以看到,该类是AbstractJerseyEurekaHttpClient类的子类,在上一章中我们分析过AbstractJerseyEurekaHttpClient类,这里就只分析JerseyReplicationClient类了。

JerseyReplicationClient类除了对服务续约、服务状态变更进行了扩展外,就是在http请求头中设置x-netflix-discovery-replication=true。

com.netflix.eureka.transport.JerseyReplicationClient

  1. public class JerseyReplicationClient extends AbstractJerseyEurekaHttpClient implements HttpReplicationClient {
  2. ...
  3. @Override
  4. protected void addExtraHeaders(Builder webResource) {
  5. webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true");
  6. }
  7. ...
  8. }

com.netflix.eureka.cluster.PeerEurekaNode

  1. public class PeerEurekaNode {
  2. ...
  3. public static final String HEADER_REPLICATION = "x-netflix-discovery-replication";
  4. ...
  5. }

Eureka Server之间的同步是在具体操作之后。

服务注册

比如服务注册,是在服务注册完成后,再执行服务同步。所以PeerAwareInstanceRegistryImpl会在同步之前调用父类AbstractInstanceRegistry的相关逻辑,比如服务注册,而我们在InstanceRegistry类和PeerAwareInstanceRegistryImpl类中都没有看到服务注册的逻辑,那么主要逻辑就肯定在它们的父类com.netflix.eureka.registry.AbstractInstanceRegistry类中了。

这里写图片描述

  1. package com.netflix.eureka.registry;
  2. ...
  3. public abstract class AbstractInstanceRegistry implements InstanceRegistry {
  4. ...
  5. //服务清单
  6. private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
  7. = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
  8. ...
  9. public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
  10. try {
  11. read.lock();
  12. //获取服务的所有实例
  13. Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
  14. REGISTER.increment(isReplication);
  15. if (gMap == null) {
  16. final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
  17. gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
  18. if (gMap == null) {
  19. gMap = gNewMap;
  20. }
  21. }
  22. //获取实例的信息,如果以前没有注册过,就返回null
  23. Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
  24. // Retain the last dirty timestamp without overwriting it, if there is already a lease
  25. if (existingLease != null && (existingLease.getHolder() != null)) {
  26. Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
  27. Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
  28. logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
  29. if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
  30. logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
  31. " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
  32. logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
  33. registrant = existingLease.getHolder();
  34. }
  35. } else {
  36. // The lease does not exist and hence it is a new registration
  37. synchronized (lock) {
  38. if (this.expectedNumberOfRenewsPerMin > 0) {
  39. // Since the client wants to cancel it, reduce the threshold
  40. // (1
  41. // for 30 seconds, 2 for a minute)
  42. this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
  43. this.numberOfRenewsPerMinThreshold =
  44. (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
  45. }
  46. }
  47. logger.debug("No previous lease information found; it is new registration");
  48. }
  49. //创建一个新的租约实例
  50. Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
  51. if (existingLease != null) {
  52. //如果以前注册过,就设置为以前的注册(启用)时间。(没有明白这个操作的目的)
  53. lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
  54. }
  55. //保存进到服务实例集合中
  56. gMap.put(registrant.getId(), lease);
  57. synchronized (recentRegisteredQueue) {
  58. recentRegisteredQueue.add(new Pair<Long, String>(
  59. System.currentTimeMillis(),
  60. registrant.getAppName() + "(" + registrant.getId() + ")"));
  61. }
  62. // This is where the initial state transfer of overridden status happens
  63. if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
  64. logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
  65. + "overrides", registrant.getOverriddenStatus(), registrant.getId());
  66. if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
  67. logger.info("Not found overridden id {} and hence adding it", registrant.getId());
  68. overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
  69. }
  70. }
  71. InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
  72. if (overriddenStatusFromMap != null) {
  73. logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
  74. registrant.setOverriddenStatus(overriddenStatusFromMap);
  75. }
  76. // Set the status based on the overridden status rules
  77. InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
  78. registrant.setStatusWithoutDirty(overriddenInstanceStatus);
  79. // If the lease is registered with UP status, set lease service up timestamp
  80. if (InstanceStatus.UP.equals(registrant.getStatus())) {
  81. //重新记录当前时间为服务注册(启用)时间
  82. lease.serviceUp();
  83. }
  84. registrant.setActionType(ActionType.ADDED);
  85. //记录最近的操作记录。
  86. recentlyChangedQueue.add(new RecentlyChangedItem(lease));
  87. registrant.setLastUpdatedTimestamp();
  88. invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
  89. logger.info("Registered instance {}/{} with status {} (replication={})",
  90. registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
  91. } finally {
  92. read.unlock();
  93. }
  94. }
  95. ...
  96. }

上面就是Eureka Server服务注册的主要逻辑。

服务下线

在了解了服务注册的逻辑后, 服务下线的逻辑就简单很多了。主要是将服务实例从服务实例集合中删除,并留下删除时间和记录。

这里写图片描述

  1. package com.netflix.eureka.registry;
  2. ...
  3. public abstract class AbstractInstanceRegistry implements InstanceRegistry {
  4. ...
  5. @Override
  6. public boolean cancel(String appName, String id, boolean isReplication) {
  7. return internalCancel(appName, id, isReplication);
  8. }
  9. /** * {@link #cancel(String, String, boolean)} method is overridden by {@link PeerAwareInstanceRegistry}, so each * cancel request is replicated to the peers. This is however not desired for expires which would be counted * in the remote peers as valid cancellations, so self preservation mode would not kick-in. */
  10. protected boolean internalCancel(String appName, String id, boolean isReplication) {
  11. try {
  12. read.lock();
  13. //统计下线服务实例数
  14. CANCEL.increment(isReplication);
  15. Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
  16. Lease<InstanceInfo> leaseToCancel = null;
  17. if (gMap != null) {
  18. //删除下线的服务实例
  19. leaseToCancel = gMap.remove(id);
  20. }
  21. synchronized (recentCanceledQueue) {
  22. recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
  23. }
  24. InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
  25. if (instanceStatus != null) {
  26. logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
  27. }
  28. if (leaseToCancel == null) {
  29. //统计没有找到的下线服务实例数
  30. CANCEL_NOT_FOUND.increment(isReplication);
  31. logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
  32. return false;
  33. } else {
  34. //更新下线/剔除时间
  35. leaseToCancel.cancel();
  36. InstanceInfo instanceInfo = leaseToCancel.getHolder();
  37. String vip = null;
  38. String svip = null;
  39. if (instanceInfo != null) {
  40. instanceInfo.setActionType(ActionType.DELETED);
  41. //记录下线/剔除动作
  42. recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
  43. instanceInfo.setLastUpdatedTimestamp();
  44. vip = instanceInfo.getVIPAddress();
  45. svip = instanceInfo.getSecureVipAddress();
  46. }
  47. invalidateCache(appName, vip, svip);
  48. logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
  49. return true;
  50. }
  51. } finally {
  52. read.unlock();
  53. }
  54. }
  55. ...
  56. }

服务续约

服务续约就是一个心跳机制,服务实例每过一段时间就要向Eureka Server报告, 我还正常, 这样才不会被服务剔除机制给删除掉。在代码逻辑里面,就是更新一个租约的最后修改时间,而这个最后修改时间一般会往后加一个租凭期限,这个期限默认是90秒;而在判断租约过期时,也会再加一次租凭期限,所以,默认情况下,一个服务实例如果180秒还没有续约的话,就会判定这个服务实例已不能正常提供服务。

这里写图片描述

  1. package com.netflix.eureka.registry;
  2. ...
  3. public abstract class AbstractInstanceRegistry implements InstanceRegistry {
  4. ...
  5. public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
  6. ...
  7. //将租凭期限做为租约的实例属性,这个值是从子类PeerAwareInstanceRegistryImpl传递过来的
  8. Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
  9. ...
  10. }
  11. ...
  12. //服务续约的逻辑
  13. public boolean renew(String appName, String id, boolean isReplication) {
  14. RENEW.increment(isReplication);
  15. Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
  16. Lease<InstanceInfo> leaseToRenew = null;
  17. if (gMap != null) {
  18. leaseToRenew = gMap.get(id);
  19. }
  20. if (leaseToRenew == null) {
  21. RENEW_NOT_FOUND.increment(isReplication);
  22. logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
  23. return false;
  24. } else {
  25. InstanceInfo instanceInfo = leaseToRenew.getHolder();
  26. if (instanceInfo != null) {
  27. // touchASGCache(instanceInfo.getASGName());
  28. InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
  29. instanceInfo, leaseToRenew, isReplication);
  30. if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
  31. logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
  32. + "; re-register required", instanceInfo.getId());
  33. RENEW_NOT_FOUND.increment(isReplication);
  34. return false;
  35. }
  36. if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
  37. Object[] args = {
  38. instanceInfo.getStatus().name(),
  39. instanceInfo.getOverriddenStatus().name(),
  40. instanceInfo.getId()
  41. };
  42. logger.info(
  43. "The instance status {} is different from overridden instance status {} for instance {}. "
  44. + "Hence setting the status to overridden status", args);
  45. instanceInfo.setStatus(overriddenInstanceStatus);
  46. }
  47. }
  48. renewsLastMin.increment();
  49. //更新续约时间/最后修改时间
  50. leaseToRenew.renew();
  51. return true;
  52. }
  53. }
  54. ...
  55. }
  56. public class Lease<T> {
  57. ...
  58. //默认的租凭期限
  59. public static final int DEFAULT_DURATION_IN_SECS = 90;
  60. ...
  61. private long duration;
  62. public Lease(T r, int durationInSecs) {
  63. holder = r;
  64. registrationTimestamp = System.currentTimeMillis();
  65. lastUpdateTimestamp = registrationTimestamp;
  66. duration = (durationInSecs * 1000);
  67. }
  68. //续约更新最后修改时间
  69. public void renew() {
  70. lastUpdateTimestamp = System.currentTimeMillis() + duration;
  71. }
  72. ...
  73. }
  74. public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
  75. ...
  76. @Override
  77. public void register(final InstanceInfo info, final boolean isReplication) {
  78. //设置默认租凭期限
  79. int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
  80. if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
  81. //如果服务实例有设置租凭期限,就设置为服务实例的租凭期限。
  82. leaseDuration = info.getLeaseInfo().getDurationInSecs();
  83. }
  84. super.register(info, leaseDuration, isReplication);
  85. replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
  86. }
  87. ...
  88. }

服务剔除

上面分析中说到如果一个服务实例180秒还没有更新最后修改时间的话, 就将会被服务剔除机制删除。

在该机制中,还有一个保护机制,就是如果在一定时间段内,判断服务续约成功的实例数低于续约阈值(最大心跳总数的85%,相当于服务实例数*2,因为心跳消息默认30s发送一次),Eureka Server会将当前的实例信息保护起来,让这些实例不会过期。当然前提是要开启自我保护机制,默认是开启的,也可以配置eureka.server.enable-self-preservation=false来关闭保护机制。

这里写图片描述

  1. public abstract class AbstractInstanceRegistry implements InstanceRegistry {
  2. ...
  3. @Override
  4. public void evict() {
  5. evict(0l);
  6. }
  7. //服务剔除逻辑
  8. public void evict(long additionalLeaseMs) {
  9. logger.debug("Running the evict task");
  10. //保护机制,实现逻辑在PeerAwareInstanceRegistryImpl中。
  11. if (!isLeaseExpirationEnabled()) {
  12. logger.debug("DS: lease expiration is currently disabled.");
  13. return;
  14. }
  15. // We collect first all expired items, to evict them in random order. For large eviction sets,
  16. // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
  17. // the impact should be evenly distributed across all applications.
  18. List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
  19. for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
  20. Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
  21. if (leaseMap != null) {
  22. for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
  23. Lease<InstanceInfo> lease = leaseEntry.getValue();
  24. //判断是否过期
  25. if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
  26. expiredLeases.add(lease);
  27. }
  28. }
  29. }
  30. }
  31. // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
  32. // triggering self-preservation. Without that we would wipe out full registry.
  33. int registrySize = (int) getLocalRegistrySize();
  34. int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
  35. int evictionLimit = registrySize - registrySizeThreshold;
  36. int toEvict = Math.min(expiredLeases.size(), evictionLimit);
  37. if (toEvict > 0) {
  38. logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
  39. //传入当前时间为种子生成随机,避免 Java 的伪随机情况
  40. Random random = new Random(System.currentTimeMillis());
  41. for (int i = 0; i < toEvict; i++) {
  42. // Pick a random item (Knuth shuffle algorithm)
  43. int next = i + random.nextInt(expiredLeases.size() - i);
  44. //随机调换后面的元素到当前位置( i )
  45. Collections.swap(expiredLeases, i, next);
  46. Lease<InstanceInfo> lease = expiredLeases.get(i);
  47. String appName = lease.getHolder().getAppName();
  48. String id = lease.getHolder().getId();
  49. EXPIRED.increment();
  50. logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
  51. //剔除服务
  52. internalCancel(appName, id, false);
  53. }
  54. }
  55. }
  56. ...
  57. }
  58. public class Lease<T> {
  59. ...
  60. //默认的租凭期限
  61. public static final int DEFAULT_DURATION_IN_SECS = 90;
  62. ...
  63. private long duration;
  64. public Lease(T r, int durationInSecs) {
  65. holder = r;
  66. registrationTimestamp = System.currentTimeMillis();
  67. lastUpdateTimestamp = registrationTimestamp;
  68. duration = (durationInSecs * 1000);
  69. }
  70. //续约更新最后修改时间
  71. public void renew() {
  72. lastUpdateTimestamp = System.currentTimeMillis() + duration;
  73. }
  74. //判断是否过期,当前时间>最后修改时间+租凭期限
  75. public boolean isExpired(long additionalLeaseMs) {
  76. return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
  77. }
  78. ...
  79. }

下面我们来看看自保护机制的逻辑

  1. public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
  2. ...
  3. //定时更新续约阈值,默认15分钟
  4. private void scheduleRenewalThresholdUpdateTask() {
  5. timer.schedule(new TimerTask() {
  6. @Override
  7. public void run() {
  8. updateRenewalThreshold();
  9. }
  10. }, serverConfig.getRenewalThresholdUpdateIntervalMs(),
  11. serverConfig.getRenewalThresholdUpdateIntervalMs());
  12. }
  13. ...
  14. //保护机制的判断逻辑
  15. @Override
  16. public boolean isLeaseExpirationEnabled() {
  17. if (!isSelfPreservationModeEnabled()) {
  18. // The self preservation mode is disabled, hence allowing the instances to expire.
  19. return true;
  20. }
  21. //判断服务续约成功的实例数低于注册数的85%
  22. return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
  23. }
  24. //判断是否打开保护机制
  25. @Override
  26. public boolean isSelfPreservationModeEnabled() {
  27. return serverConfig.shouldEnableSelfPreservation();
  28. }
  29. ...
  30. //更新续约阈值
  31. private void updateRenewalThreshold() {
  32. try {
  33. Applications apps = eurekaClient.getApplications();
  34. int count = 0;
  35. for (Application app : apps.getRegisteredApplications()) {
  36. for (InstanceInfo instance : app.getInstances()) {
  37. if (this.isRegisterable(instance)) {
  38. ++count;
  39. }
  40. }
  41. }
  42. synchronized (lock) {
  43. // Update threshold only if the threshold is greater than the
  44. // current expected threshold of if the self preservation is disabled.
  45. // 更新续约阈值的判断(这里存在一定问题,会使保护机制一直存在),应用实例每分钟最大心跳数( count * 2 ) 小于期望最小每分钟续租次数( serverConfig.getRenewalPercentThreshold() * numberOfRenewsPerMinThreshold ),不重新计算
  46. if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * numberOfRenewsPerMinThreshold)
  47. || (!this.isSelfPreservationModeEnabled())) {
  48. this.expectedNumberOfRenewsPerMin = count * 2;
  49. this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold());
  50. }
  51. }
  52. logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
  53. } catch (Throwable e) {
  54. logger.error("Cannot update renewal threshold", e);
  55. }
  56. }
  57. ...
  58. }

上面的代码中有两个逻辑

  • 自我保护的判断逻辑
    isLeaseExpirationEnabled()方法是其入口,首先判断是否打开保护机制,默认开启。然后再判断服务续约成功的实例数低于续约阈值,默认是注册数*2的85%,因为心跳是30秒发送一次,而剔除机制是1分钟执行一次。
  • 更新自我保护
    scheduleRenewalThresholdUpdateTask()方法是该逻辑的入口, 在这个方法中,创建了一个定时任务,默认是每15分钟更新一次续约阈值。
    updateRenewalThreshold()方法封装了更新阈值的逻辑。但是如果应用实例每分钟最大心跳数( count * 2 ) 小于期望最小每分钟续租次数( serverConfig.getRenewalPercentThreshold() * numberOfRenewsPerMinThreshold ),不重新计算时,并且服务实例确实不可用了,那么自我保护状态会一直存在。

参考资料
《spring cloud 微服务实战》
Eureka 源码解析 —— 应用实例注册发现(四)之自我保护机制
深度剖析服务发现组件Netflix Eureka

发表评论

表情:
评论列表 (有 0 条评论,270人围观)

还没有评论,来说两句吧...

相关阅读