SpringCloud Eureka Server 源码

Myth丶恋晨 2022-05-27 06:59 349阅读 0赞

Server端,又可以称之为注册中心,其作用简单的说,可以总结为以下几点:

  1. 接收客户端的注册信息,把这些信息存起来
  2. 供其它客户端查询注册信息
  3. 同步其它节点的注册信息
  4. 剔除失效的客户端实例
  5. 自我保护

可是具体是怎么实现的呢?
第一、二点为对外(客户端)提供的接口,就不详细说了,看看剩下的3个

启动

EurekaServerInitializerConfiguration实现了ServletContextAware和SmartLifecycle接口,通过SmartLifecycle接口,Spring容器初始化该bean时会调用相应生命周期方法start()

  1. @Override
  2. public void start() {
  3. new Thread(new Runnable() {
  4. @Override
  5. public void run() {
  6. try {
  7. //TODO: is this class even needed now?
  8. eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
  9. log.info("Started Eureka Server");
  10. publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
  11. EurekaServerInitializerConfiguration.this.running = true;
  12. publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
  13. }
  14. catch (Exception ex) {
  15. // Help!
  16. log.error("Could not initialize Eureka servlet context", ex);
  17. }
  18. }
  19. }).start();
  20. }
  1. 初始化上下文-eurekaServerBootstrap.contextInitialized()
  2. 发布了“可以注册事件”和“服务启动事件”

EurekaServerBootstrap的初始化方法contextInitialized:

  1. public void contextInitialized(ServletContext context) {
  2. try {
  3. initEurekaEnvironment();
  4. initEurekaServerContext();
  5. context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
  6. }
  7. catch (Throwable e) {
  8. log.error("Cannot bootstrap eureka server :", e);
  9. throw new RuntimeException("Cannot bootstrap eureka server :", e);
  10. }
  11. }

第一行初始化环境,第二行初始化上下文,我们来看一下initEurekaServerContext。

  1. protected void initEurekaServerContext() throws Exception {
  2. // For backward compatibility
  3. JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
  4. XStream.PRIORITY_VERY_HIGH);
  5. XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
  6. XStream.PRIORITY_VERY_HIGH);
  7. if (isAws(this.applicationInfoManager.getInfo())) {
  8. this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
  9. this.eurekaClientConfig, this.registry, this.applicationInfoManager);
  10. this.awsBinder.start();
  11. }
  12. EurekaServerContextHolder.initialize(this.serverContext);
  13. log.info("Initialized server context");
  14. // Copy registry from neighboring eureka node
  15. int registryCount = this.registry.syncUp();
  16. this.registry.openForTraffic(this.applicationInfoManager, registryCount);
  17. // Register all monitoring statistics.
  18. EurekaMonitors.registerAllStats();
  19. }

这里做了2件事:

  1. 向其它节点同步数据:this.registry.syncUp();
  2. 注册监视器:EurekaMonitors.registerAllStats();

    /* Populates the registry information from a peer eureka node. This operation fails over to other nodes until the list is exhausted if the communication fails. */

    1. @Override
    2. public int syncUp() {
    3. // Copy entire entry from neighboring DS node
    4. int count = 0;
    5. for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
    6. if (i > 0) {
    7. try {
    8. Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
    9. } catch (InterruptedException e) {
    10. logger.warn("Interrupted during registry transfer..");
    11. break;
    12. }
    13. }
    14. Applications apps = eurekaClient.getApplications();
    15. for (Application app : apps.getRegisteredApplications()) {
    16. for (InstanceInfo instance : app.getInstances()) {
    17. try {
    18. if (isRegisterable(instance)) {
    19. register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
    20. count++;
    21. }
    22. } catch (Throwable t) {
    23. logger.error("During DS init copy", t);
    24. }
    25. }
    26. }
    27. }
    28. return count;
    29. }
    30. @Override
    31. public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    32. // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
    33. this.expectedNumberOfRenewsPerMin = count * 2;
    34. this.numberOfRenewsPerMinThreshold =
    35. (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
    36. logger.info("Got " + count + " instances from neighboring DS node");
    37. logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold);
    38. this.startupTime = System.currentTimeMillis();
    39. if (count > 0) {
    40. this.peerInstancesTransferEmptyOnStartup = false;
    41. }
    42. DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    43. boolean isAws = Name.Amazon == selfName;
    44. if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
    45. logger.info("Priming AWS connections for all replicas..");
    46. primeAwsReplicas(applicationInfoManager);
    47. }
    48. logger.info("Changing status to UP");
    49. applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    50. super.postInit();
    51. }

其中syncUp()方法通过eurekaClient.getApplications();获取集群节点,之后轮询可注册的节点,将信息同步到其它节点。

这里有个配置属性numberRegistrySyncRetries、registrySyncRetryWaitMs。前者为向其它节点注册的次数,后者是失败后再次注册需要等待的时间。默认重试次数为5次。此处保证了至少向一个节点成功同步数据。

openForTraffic()中,最终调用super.postInit()来开启定时任务,来清理过期得客户端。

  1. protected void postInit() {
  2. renewsLastMin.start();
  3. if (evictionTaskRef.get() != null) {
  4. evictionTaskRef.get().cancel();
  5. }
  6. evictionTaskRef.set(new EvictionTask());
  7. evictionTimer.schedule(evictionTaskRef.get(),
  8. serverConfig.getEvictionIntervalTimerInMs(),
  9. serverConfig.getEvictionIntervalTimerInMs());
  10. }

renewsLastMin.start() : 在每个Eureka-Server端都维护着,每分钟的续约数量,续约数量是有一个Long类型的变量
来存储的,每过一分钟就需要对这个变量进行清0

清理任务是EvictionTask这个类实现的。

  1. class EvictionTask extends TimerTask {
  2. private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
  3. @Override
  4. public void run() {
  5. try {
  6. long compensationTimeMs = getCompensationTimeMs();
  7. logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
  8. evict(compensationTimeMs);
  9. } catch (Throwable e) {
  10. logger.error("Could not run the evict task", e);
  11. }
  12. }
  13. /** * compute a compensation time defined as the actual time this task was executed since the prev iteration, * vs the configured amount of time for execution. This is useful for cases where changes in time (due to * clock skew or gc for example) causes the actual eviction task to execute later than the desired time * according to the configured cycle. */
  14. long getCompensationTimeMs() {
  15. long currNanos = getCurrentTimeNano();
  16. long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
  17. if (lastNanos == 0l) {
  18. return 0l;
  19. }
  20. long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
  21. long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
  22. return compensationTime <= 0l ? 0l : compensationTime;
  23. }
  24. long getCurrentTimeNano() { // for testing
  25. return System.nanoTime();
  26. }
  27. }
  28. public void evict(long additionalLeaseMs) {
  29. logger.debug("Running the evict task");
  30. if (!isLeaseExpirationEnabled()) {
  31. logger.debug("DS: lease expiration is currently disabled.");
  32. return;
  33. }
  34. // We collect first all expired items, to evict them in random order. For large eviction sets,
  35. // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
  36. // the impact should be evenly distributed across all applications.
  37. List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
  38. for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
  39. Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
  40. if (leaseMap != null) {
  41. for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
  42. Lease<InstanceInfo> lease = leaseEntry.getValue();
  43. if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
  44. expiredLeases.add(lease);
  45. }
  46. }
  47. }
  48. }
  49. // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
  50. // triggering self-preservation. Without that we would wipe out full registry.
  51. int registrySize = (int) getLocalRegistrySize();
  52. int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
  53. int evictionLimit = registrySize - registrySizeThreshold;
  54. int toEvict = Math.min(expiredLeases.size(), evictionLimit);
  55. if (toEvict > 0) {
  56. logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
  57. Random random = new Random(System.currentTimeMillis());
  58. for (int i = 0; i < toEvict; i++) {
  59. // Pick a random item (Knuth shuffle algorithm)
  60. int next = i + random.nextInt(expiredLeases.size() - i);
  61. Collections.swap(expiredLeases, i, next);
  62. Lease<InstanceInfo> lease = expiredLeases.get(i);
  63. String appName = lease.getHolder().getAppName();
  64. String id = lease.getHolder().getId();
  65. EXPIRED.increment();
  66. logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
  67. internalCancel(appName, id, false);
  68. }
  69. }
  70. }

其中isExpired判断续约是否过期。

  1. public boolean isExpired(long additionalLeaseMs) {
  2. return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
  3. }

evictionTimestamp: 为客户端请求的服务下线时间。
lastUpdateTimestamp:上一次续约修改过的时间。
duration:续约延长的时间,为客户端注册时的lease-expiration-duration-in-seconds

分批过期机制

从上面可以得知 , 这里有个分批过期的概念,每次最多过期15%的机器,超过15%则不会自动过期,这15%随机选择。

  1. int registrySize = (int) getLocalRegistrySize();
  2. int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
  3. int evictionLimit = registrySize - registrySizeThreshold;

serverConfig.getRenewalPercentThreshold()默认0.85。

自我保护

如果开启自我保护,则认为不允许过期,以上过期机制将无效。
evict方法中代码如下:

  1. if (!isLeaseExpirationEnabled()) {
  2. logger.debug("DS: lease expiration is currently disabled.");
  3. return;
  4. }

发表评论

表情:
评论列表 (有 0 条评论,349人围观)

还没有评论,来说两句吧...

相关阅读