Spring Cloud Eureka源码解析之Eureka Client启动流程(二)

爱被打了一巴掌 2022-09-16 04:52 330阅读 0赞

看下EurekaClient的自动装配类EurekaClientAutoConfiguration
EurekaClientConfigBean包含了eureka.client的配置

  1. @Bean
  2. @ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
  3. public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
  4. EurekaClientConfigBean client = new EurekaClientConfigBean();
  5. if ("bootstrap".equals(this.env.getProperty("spring.config.name"))) {
  6. // We don't register during bootstrap by default, but there will be another
  7. // chance later.
  8. client.setRegisterWithEureka(false);
  9. }
  10. return client;
  11. }

EurekaInstanceConfigBean包含了eureka.instance的配置

  1. @Bean
  2. @ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
  3. public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,
  4. ManagementMetadataProvider managementMetadataProvider) {
  5. String hostname = getProperty("eureka.instance.hostname");
  6. boolean preferIpAddress = Boolean.parseBoolean(getProperty("eureka.instance.prefer-ip-address"));
  7. String ipAddress = getProperty("eureka.instance.ip-address");
  8. boolean isSecurePortEnabled = Boolean.parseBoolean(getProperty("eureka.instance.secure-port-enabled"));
  9. String serverContextPath = env.getProperty("server.context-path", "/");
  10. int serverPort = Integer.valueOf(env.getProperty("server.port", env.getProperty("port", "8080")));
  11. Integer managementPort = env.getProperty("management.server.port", Integer.class);// nullable. should be wrapped into optional
  12. String managementContextPath = env.getProperty("management.server.context-path");// nullable. should be wrapped into optional
  13. Integer jmxPort = env.getProperty("com.sun.management.jmxremote.port", Integer.class);//nullable
  14. EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);
  15. instance.setNonSecurePort(serverPort);
  16. instance.setInstanceId(getDefaultInstanceId(env));
  17. instance.setPreferIpAddress(preferIpAddress);
  18. instance.setSecurePortEnabled(isSecurePortEnabled);
  19. if (StringUtils.hasText(ipAddress)) {
  20. instance.setIpAddress(ipAddress);
  21. }
  22. if(isSecurePortEnabled) {
  23. instance.setSecurePort(serverPort);
  24. }
  25. if (StringUtils.hasText(hostname)) {
  26. instance.setHostname(hostname);
  27. }
  28. String statusPageUrlPath = getProperty("eureka.instance.status-page-url-path");
  29. String healthCheckUrlPath = getProperty("eureka.instance.health-check-url-path");
  30. if (StringUtils.hasText(statusPageUrlPath)) {
  31. instance.setStatusPageUrlPath(statusPageUrlPath);
  32. }
  33. if (StringUtils.hasText(healthCheckUrlPath)) {
  34. instance.setHealthCheckUrlPath(healthCheckUrlPath);
  35. }
  36. ManagementMetadata metadata = managementMetadataProvider.get(instance, serverPort,
  37. serverContextPath, managementContextPath, managementPort);
  38. if(metadata != null) {
  39. instance.setStatusPageUrl(metadata.getStatusPageUrl());
  40. instance.setHealthCheckUrl(metadata.getHealthCheckUrl());
  41. if(instance.isSecurePortEnabled()) {
  42. instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl());
  43. }
  44. Map<String, String> metadataMap = instance.getMetadataMap();
  45. if (metadataMap.get("management.port") == null) {
  46. metadataMap.put("management.port", String.valueOf(metadata.getManagementPort()));
  47. }
  48. }
  49. setupJmxPort(instance, jmxPort);
  50. return instance;
  51. }

RefreshableEurekaClientConfiguration.eurekaClient
在这里插入图片描述
DiscoveryClient是eureka的核心类,而CloudEurekaClient就是DiscoveryClient的子类
在这里插入图片描述
在这里插入图片描述
进入到DiscoveryClient的构造方法,这里面会初始化一些任务

  1. @Inject
  2. DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
  3. Provider<BackupRegistry> backupRegistryProvider) {
  4. if (args != null) {
  5. this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
  6. this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
  7. this.eventListeners.addAll(args.getEventListeners());
  8. this.preRegistrationHandler = args.preRegistrationHandler;
  9. } else {
  10. this.healthCheckCallbackProvider = null;
  11. this.healthCheckHandlerProvider = null;
  12. this.preRegistrationHandler = null;
  13. }
  14. this.applicationInfoManager = applicationInfoManager;
  15. // 该eureka实例
  16. InstanceInfo myInfo = applicationInfoManager.getInfo();
  17. clientConfig = config;
  18. staticClientConfig = clientConfig;
  19. transportConfig = config.getTransportConfig();
  20. instanceInfo = myInfo;
  21. if (myInfo != null) {
  22. appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
  23. } else {
  24. logger.warn("Setting instanceInfo to a passed in null value");
  25. }
  26. this.backupRegistryProvider = backupRegistryProvider;
  27. this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
  28. localRegionApps.set(new Applications());
  29. fetchRegistryGeneration = new AtomicLong(0);
  30. remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
  31. remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
  32. if (config.shouldFetchRegistry()) {
  33. this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{ 15L, 30L, 60L, 120L, 240L, 480L});
  34. } else {
  35. this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
  36. }
  37. if (config.shouldRegisterWithEureka()) {
  38. this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{ 15L, 30L, 60L, 120L, 240L, 480L});
  39. } else {
  40. this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
  41. }
  42. logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
  43. // 不注册到eureka server && 不从eureka server获取注册信息服务列表,则就返回了
  44. if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
  45. logger.info("Client configured to neither register nor query for data.");
  46. scheduler = null;
  47. heartbeatExecutor = null;
  48. cacheRefreshExecutor = null;
  49. eurekaTransport = null;
  50. instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
  51. // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
  52. // to work with DI'd DiscoveryClient
  53. DiscoveryManager.getInstance().setDiscoveryClient(this);
  54. DiscoveryManager.getInstance().setEurekaClientConfig(config);
  55. initTimestampMs = System.currentTimeMillis();
  56. logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
  57. initTimestampMs, this.getApplications().size());
  58. return; // no need to setup up an network tasks and we are done
  59. }
  60. try {
  61. // default size of 2 - 1 each for heartbeat and cacheRefresh
  62. scheduler = Executors.newScheduledThreadPool(2,
  63. new ThreadFactoryBuilder()
  64. .setNameFormat("DiscoveryClient-%d")
  65. .setDaemon(true)
  66. .build());
  67. // 发送心跳线程池
  68. heartbeatExecutor = new ThreadPoolExecutor(
  69. 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
  70. new SynchronousQueue<Runnable>(),
  71. new ThreadFactoryBuilder()
  72. .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
  73. .setDaemon(true)
  74. .build()
  75. ); // use direct handoff
  76. // 刷新注册信息缓存线程池
  77. cacheRefreshExecutor = new ThreadPoolExecutor(
  78. 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
  79. new SynchronousQueue<Runnable>(),
  80. new ThreadFactoryBuilder()
  81. .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
  82. .setDaemon(true)
  83. .build()
  84. ); // use direct handoff
  85. eurekaTransport = new EurekaTransport();
  86. scheduleServerEndpointTask(eurekaTransport, args);
  87. AzToRegionMapper azToRegionMapper;
  88. if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
  89. azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
  90. } else {
  91. azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
  92. }
  93. if (null != remoteRegionsToFetch.get()) {
  94. azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
  95. }
  96. instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
  97. } catch (Throwable e) {
  98. throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
  99. }
  100. // 第一次拉取,如果拉取失败就从备份注册中心拉取
  101. if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
  102. fetchRegistryFromBackup();
  103. }
  104. // call and execute the pre registration handler before all background tasks (inc registration) is started
  105. if (this.preRegistrationHandler != null) {
  106. this.preRegistrationHandler.beforeRegistration();
  107. }
  108. // 需要注册到eureka server && 在初始化时强制注册(默认false,所以这里的register方法默认不执行)
  109. if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
  110. try {
  111. if (!register() ) {
  112. throw new IllegalStateException("Registration error at startup. Invalid server response.");
  113. }
  114. } catch (Throwable th) {
  115. logger.error("Registration error at startup: {}", th.getMessage());
  116. throw new IllegalStateException(th);
  117. }
  118. }
  119. // 初始化定时任务,服务心跳、服务列表获取等功能在此处完成
  120. initScheduledTasks();
  121. try {
  122. Monitors.registerObject(this);
  123. } catch (Throwable e) {
  124. logger.warn("Cannot register timers", e);
  125. }
  126. // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
  127. // to work with DI'd DiscoveryClient
  128. DiscoveryManager.getInstance().setDiscoveryClient(this);
  129. DiscoveryManager.getInstance().setEurekaClientConfig(config);
  130. initTimestampMs = System.currentTimeMillis();
  131. logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
  132. initTimestampMs, this.getApplications().size());
  133. }
  134. // 初始化eureka client所有的定时任务
  135. private void initScheduledTasks() {
  136. if (clientConfig.shouldFetchRegistry()) {
  137. // 缓存刷新定时任务,拉取注册信息
  138. int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
  139. int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
  140. scheduler.schedule(
  141. new TimedSupervisorTask(
  142. "cacheRefresh",
  143. scheduler,
  144. cacheRefreshExecutor,
  145. registryFetchIntervalSeconds,
  146. TimeUnit.SECONDS,
  147. expBackOffBound,
  148. new CacheRefreshThread()
  149. ),
  150. registryFetchIntervalSeconds, TimeUnit.SECONDS);
  151. }
  152. if (clientConfig.shouldRegisterWithEureka()) {
  153. int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
  154. int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
  155. logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
  156. // 发送心跳定时器
  157. scheduler.schedule(
  158. new TimedSupervisorTask(
  159. "heartbeat",
  160. scheduler,
  161. heartbeatExecutor,
  162. renewalIntervalInSecs,
  163. TimeUnit.SECONDS,
  164. expBackOffBound,
  165. new HeartbeatThread()
  166. ),
  167. renewalIntervalInSecs, TimeUnit.SECONDS);
  168. // 实例信息复制器,不知道为啥叫复制器,里面其实是注册的逻辑
  169. instanceInfoReplicator = new InstanceInfoReplicator(
  170. this,
  171. instanceInfo,
  172. clientConfig.getInstanceInfoReplicationIntervalSeconds(),
  173. 2); // burstSize
  174. // 实例状态变更的监听器
  175. statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
  176. @Override
  177. public String getId() {
  178. return "statusChangeListener";
  179. }
  180. @Override
  181. public void notify(StatusChangeEvent statusChangeEvent) {
  182. if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
  183. InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
  184. // log at warn level if DOWN was involved
  185. logger.warn("Saw local status change event {}", statusChangeEvent);
  186. } else {
  187. logger.info("Saw local status change event {}", statusChangeEvent);
  188. }
  189. // 有可能会重新注册
  190. instanceInfoReplicator.onDemandUpdate();
  191. }
  192. };
  193. if (clientConfig.shouldOnDemandUpdateStatusChange()) {
  194. applicationInfoManager.registerStatusChangeListener(statusChangeListener);
  195. }
  196. // 启动复制器,包含注册逻辑
  197. instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
  198. } else {
  199. logger.info("Not registering with Eureka server per configuration");
  200. }
  201. }

发表评论

表情:
评论列表 (有 0 条评论,330人围观)

还没有评论,来说两句吧...

相关阅读