Spring Cloud Eureka(七):DiscoveryClient 源码分析

布满荆棘的人生 2022-05-11 15:24 372阅读 0赞

文章目录

    • 1、本节概要
    • 2、服务注册(register)
    • 3、服务续约(renew)
    • 4、服务下线(unregister)
    • 5、服务获取(fetchRegistry)
    • 6、缓存刷新(refreshRegistry)

1、本节概要

上一节文章主要介绍了Eureka Client 的服务注册的流程,没有对服务治理进行介绍,本文目的就是从源码角度来学习服务实例的治理机制,主要包括以下内容:

  • 服务注册(register)
  • 服务续约(renew)
  • 服务下线(unregister)
  • 服务拉取(fetchRegistry)
  • 缓存刷新(refreshRegistry)

eureka client 与 eureka server 的交互式通过REST API 来完成的,那么这时使用的HttpClient工具在,Netflix eureka和 spring cloud eureka 中是有区别的,前者使用的是 JerseyReplicationClient,后者使用的是 RestTemplateEurekaHttpClient

2、服务注册(register)

服务注册的方式是通过rest api 进行注册的,注册成功之后返回的正常状态码是 204

  1. boolean register() throws Throwable {
  2. logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
  3. EurekaHttpResponse<Void> httpResponse;
  4. try {
  5. httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
  6. } catch (Exception e) {
  7. logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
  8. throw e;
  9. }
  10. if (logger.isInfoEnabled()) {
  11. logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
  12. }
  13. return httpResponse.getStatusCode() == 204;
  14. }

3、服务续约(renew)

服务先向server端发送一个心跳,如果返回状态是 200,则表示续约成功;如果返回状态码是404,则向服务端重新注册自己。
续约服务是由一个守护线程每隔 30秒向服务端发送心跳来完成的

  1. /**
  2. * 续约方法
  3. */
  4. boolean renew() {
  5. EurekaHttpResponse<InstanceInfo> httpResponse;
  6. try {
  7. httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
  8. logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
  9. if (httpResponse.getStatusCode() == 404) {
  10. REREGISTER_COUNTER.increment();
  11. logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
  12. long timestamp = instanceInfo.setIsDirtyWithTime();
  13. boolean success = register();
  14. if (success) {
  15. instanceInfo.unsetIsDirty(timestamp);
  16. }
  17. return success;
  18. }
  19. return httpResponse.getStatusCode() == 200;
  20. } catch (Throwable e) {
  21. logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
  22. return false;
  23. }
  24. }
  25. // 心跳定时器
  26. scheduler.schedule(
  27. new TimedSupervisorTask(
  28. "heartbeat",
  29. scheduler,
  30. heartbeatExecutor,
  31. renewalIntervalInSecs,
  32. TimeUnit.SECONDS,
  33. expBackOffBound,
  34. new HeartbeatThread()
  35. ),
  36. renewalIntervalInSecs, TimeUnit.SECONDS);
  37. private class HeartbeatThread implements Runnable {
  38. public void run() {
  39. if (renew()) {
  40. lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
  41. }
  42. }
  43. }

4、服务下线(unregister)

通过rest api 向server端发送取消请求,那么在server端将会注销掉该服务,前提是请求注销的服务在注册中心已经存在了。

  1. void unregister() {
  2. // It can be null if shouldRegisterWithEureka == false
  3. if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
  4. try {
  5. logger.info("Unregistering ...");
  6. EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
  7. logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());
  8. } catch (Exception e) {
  9. logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
  10. }
  11. }
  12. }

5、服务获取(fetchRegistry)

如果增量拉取被禁用或是第一次拉取,则全量拉取server端已经注册的服务实例信息,否则只拉取增量服务实例数据

  1. private boolean fetchRegistry(boolean forceFullRegistryFetch) {
  2. Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
  3. try {
  4. // If the delta is disabled or if it is the first time, get all
  5. // applications
  6. Applications applications = getApplications();
  7. if (clientConfig.shouldDisableDelta()
  8. || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
  9. || forceFullRegistryFetch
  10. || (applications == null)
  11. || (applications.getRegisteredApplications().size() == 0)
  12. || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
  13. {
  14. logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
  15. logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
  16. logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
  17. logger.info("Application is null : {}", (applications == null));
  18. logger.info("Registered Applications size is zero : {}",
  19. (applications.getRegisteredApplications().size() == 0));
  20. logger.info("Application version is -1: {}", (applications.getVersion() == -1));
  21. //全量拉取服务实例数据
  22. getAndStoreFullRegistry();
  23. } else {
  24. //增量拉取服务实例
  25. getAndUpdateDelta(applications);
  26. }
  27. applications.setAppsHashCode(applications.getReconcileHashCode());
  28. logTotalInstances();
  29. } catch (Throwable e) {
  30. logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
  31. return false;
  32. } finally {
  33. if (tracer != null) {
  34. tracer.stop();
  35. }
  36. }
  37. // 刷新本地缓存
  38. onCacheRefreshed();
  39. // 基于缓存中的实例数据更新远程实例状态
  40. updateInstanceRemoteStatus();
  41. // 注册表拉取成功后返回true
  42. return true;
  43. }

6、缓存刷新(refreshRegistry)

系统默认是每隔30秒刷新本地存储的注册表

  1. private void initScheduledTasks() {
  2. if (clientConfig.shouldFetchRegistry()) {
  3. // registry cache refresh timer
  4. int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
  5. int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
  6. scheduler.schedule(
  7. new TimedSupervisorTask(
  8. "cacheRefresh",
  9. scheduler,
  10. cacheRefreshExecutor,
  11. registryFetchIntervalSeconds,
  12. TimeUnit.SECONDS,
  13. expBackOffBound,
  14. new CacheRefreshThread()
  15. ),
  16. registryFetchIntervalSeconds, TimeUnit.SECONDS);
  17. }
  18. ...................................
  19. }
  20. class CacheRefreshThread implements Runnable {
  21. public void run() {
  22. refreshRegistry();
  23. }
  24. }
  25. @VisibleForTesting
  26. void refreshRegistry() {
  27. try {
  28. boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
  29. boolean remoteRegionsModified = false;
  30. // This makes sure that a dynamic change to remote regions to fetch is honored.
  31. String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
  32. if (null != latestRemoteRegions) {
  33. String currentRemoteRegions = remoteRegionsToFetch.get();
  34. if (!latestRemoteRegions.equals(currentRemoteRegions)) {
  35. // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
  36. synchronized (instanceRegionChecker.getAzToRegionMapper()) {
  37. if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
  38. String[] remoteRegions = latestRemoteRegions.split(",");
  39. remoteRegionsRef.set(remoteRegions);
  40. instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
  41. remoteRegionsModified = true;
  42. } else {
  43. logger.info("Remote regions to fetch modified concurrently," +
  44. " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
  45. }
  46. }
  47. } else {
  48. // Just refresh mapping to reflect any DNS/Property change
  49. instanceRegionChecker.getAzToRegionMapper().refreshMapping();
  50. }
  51. }
  52. boolean success = fetchRegistry(remoteRegionsModified);
  53. if (success) {
  54. registrySize = localRegionApps.get().size();
  55. lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
  56. }
  57. if (logger.isDebugEnabled()) {
  58. StringBuilder allAppsHashCodes = new StringBuilder();
  59. allAppsHashCodes.append("Local region apps hashcode: ");
  60. allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
  61. allAppsHashCodes.append(", is fetching remote regions? ");
  62. allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
  63. for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
  64. allAppsHashCodes.append(", Remote region: ");
  65. allAppsHashCodes.append(entry.getKey());
  66. allAppsHashCodes.append(" , apps hashcode: ");
  67. allAppsHashCodes.append(entry.getValue().getAppsHashCode());
  68. }
  69. logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
  70. allAppsHashCodes);
  71. }
  72. } catch (Throwable e) {
  73. logger.error("Cannot fetch registry from server", e);
  74. }
  75. }

文末总结:

1、Eureka client从注册中心拉取服务列表,然后自身会做缓存

2、作为服务消费者,就是从这些缓存信息中获取的服务提供者的信息

3、增量更新的服务以30秒为周期循环更新

4、增量更新数据在服务端保存时间为3分钟,因此Eureka client取得的数据虽然是增量更新,仍然可能和30秒前取的数据一样,所以Eureka client要自己来处理重复信息

5、Eureka client的增量更新,其实获取的是Eureka server最近三分钟内的变更,如果Eureka client有超过三分钟没有做增量更新的话(例如网络问题),这就造成了Eureka server和Eureka client之间的数据不一致。正常情况下,Eureka client多次增量更新后,最终的服务列表数据应该Eureka server保持一致,但如果期间发生异常,可能导致和Eureka server的数据不一致,为了暴露这个问题,Eureka server每次返回的增量更新数据中,会带有一致性哈希码,Eureka client用本地服务列表数据算出的一致性哈希码应该和Eureka server返回的一致,若不一致就证明增量更新出了问题导致Eureka client和Eureka server上的服务列表信息不一致了,此时需要全量更新

发表评论

表情:
评论列表 (有 0 条评论,372人围观)

还没有评论,来说两句吧...

相关阅读

    相关 cloud-eureka

    Eureka的一些概念 Register:服务注册 当Eureka客户端向Eureka Server注册时,它提供自身的元数据,比如IP地址、端口,运行状况指示符UR