Nacos系列【26】源码分析篇之客户端自动注册

た 入场券 2024-04-08 11:16 238阅读 0赞

有道无术,术尚可求,有术无道,止于术。

版本說明:

spring-boot:2.6.3

spring-cloud:2021.0.1

spring-cloud-alibaba:2.2.8.RELEASE

Nacos:2.1.1

文章目录

    • 前言
      1. 加载日志系统
      1. 启动Grpc 客户端
      1. 自动配置类
      1. NacosWatch
      1. 自动注册

前言

紧接上篇,接下来分析下客户端自动注册的流程。

1. 加载日志系统

Spring Boot开启启动,会加载spring.factories中的自动配置类:

  • NacosDiscoveryAutoConfiguration
  • RibbonNacosAutoConfiguration
  • NacosDiscoveryEndpointAutoConfiguration
  • NacosServiceRegistryAutoConfiguration
  • NacosReactiveDiscoveryClientConfiguration
  • NacosConfigServerAutoConfiguration
  • NacosDiscoveryClientConfigServiceBootstrapConfiguration
  • NacosDiscoveryAutoConfiguration

在环境准备阶段,NacosLoggingListener监听器首先工作,因为它监听的事件是ApplicationEnvironmentPreparedEvent,其作用是加载日志系统:

  1. public void onApplicationEvent(ApplicationEvent applicationEvent) {
  2. // 如果存在`logback`相关的类,加载logback日志系统LogbackNacosLogging,否则使用Log4J2NacosLogging,然后加载配置
  3. NacosLogging.getInstance().loadConfiguration();
  4. }

在加载配置的最后阶段,可以看到其读取了classpath:nacos-logback.xm日志配置文件,并创建了一个LoggerContext日志上下文:

  1. private LoggerContext loadConfigurationOnStart() {
  2. String location = this.getLocation("classpath:nacos-logback.xml");
  3. try {
  4. LoggerContext loggerContext = (LoggerContext)StaticLoggerBinder.getSingleton().getLoggerFactory();
  5. NacosJoranConfigurator configurator = new NacosJoranConfigurator();
  6. configurator.setContext(loggerContext);
  7. configurator.doNacosConfigure(ResourceUtils.getResourceUrl(location));
  8. return loggerContext;
  9. } catch (Exception var4) {
  10. throw new IllegalStateException("Could not initialize Logback Nacos logging from " + location, var4);
  11. }
  12. }

而这个时候,当前应用对应的日志系统都还没有加载,所以Nacos客户端的一些日志级别配置,需要在源码配置文件中修改,或者通过JVM启动参数才能起效:
在这里插入图片描述

2. 启动Grpc 客户端

Nacos 2.x采用Grpc通信,gRPC是由google开发的一个高性能、通用的开源RPC框架,主要面向移动应用开发且基于HTTP/2协议标准而设计,同时支持大多数流行的编程语言。

使用gRPC框架,需要开启客户端和服务端,可以在GitHub学习下,这里就不再研究了。

GrpcSdkClient就是Nacos 进行通信的客户端,利用Grpc来实现服务器连接、消息处理等功能,启动逻辑在其start()方法中。在方法中会创建一些定时任务,以及和Nacos服务端中的GrpcSdkServer建立长连接。

3. 自动配置类

接着开始进行自动配置,注入配置类及Bean 对象,首先是服务管理器:
在这里插入图片描述
接着是配置类:
在这里插入图片描述
服务发现功能类:
在这里插入图片描述
服务发现客户端:
在这里插入图片描述
整个生命周期监测者NacosWatch
在这里插入图片描述
等等。。。其他类就不一一介绍了。

4. NacosWatch

NacosDiscoveryClientConfiguration自动配置类中,注册了NacosWatch
在这里插入图片描述
调用NacosWatch构造方法:

  1. public NacosWatch(NacosServiceManager nacosServiceManager, NacosDiscoveryProperties properties) {
  2. // 服务管理器
  3. this.nacosServiceManager = nacosServiceManager;
  4. // YML 配置
  5. this.properties = properties;
  6. // 多线程定时任务
  7. this.taskScheduler = getTaskScheduler();
  8. }

NacosWatch实现了ApplicationEventPublisherAware接口,作用是获取初始化事件发布器:
在这里插入图片描述
NacosWatch还实现了SmartLifecycle,该接口主要是作用是所有的bean都创建完成之后,可以执行初始化操作,在退出时执行资源销毁工作。
在这里插入图片描述
所以在应用启动后,执行重写了SmartLifecyclestart()方法:

  1. public void start() {
  2. if (this.running.compareAndSet(false, true)) {
  3. // 添加名称为app-service001:DEFAULT_GROUP 的监听器
  4. EventListener eventListener = (EventListener)this.listenerMap.computeIfAbsent(this.buildKey(), (event) -> {
  5. return new EventListener() {
  6. public void onEvent(Event event) {
  7. if (event instanceof NamingEvent) {
  8. List instances = ((NamingEvent)event).getInstances();
  9. Optional instanceOptional = NacosWatch.this.selectCurrentInstance(instances);
  10. instanceOptional.ifPresent((currentInstance) -> {
  11. NacosWatch.this.resetIfNeeded(currentInstance);
  12. });
  13. }
  14. }
  15. };
  16. });
  17. // 获取NamingService
  18. NamingService namingService = this.nacosServiceManager.getNamingService(this.properties.getNacosProperties());
  19. try {
  20. // 订阅事件
  21. namingService.subscribe(this.properties.getService(), this.properties.getGroup(), Arrays.asList(this.properties.getClusterName()), eventListener);
  22. } catch (Exception var4) {
  23. log.error("namingService subscribe failed, properties:{}", this.properties, var4);
  24. }
  25. // 定时任务去发布心跳事件
  26. this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(this::nacosServicesWatch, this.properties.getWatchDelay());
  27. }
  28. }

5. 自动注册

自动注册由NacosAutoServiceRegistration类来完成,继承了AbstractAutoServiceRegistration抽象类,类关系图如下:
在这里插入图片描述
实现了监听器接口,监听WebServerInitializedEvent事件,所以在Web容器初始化完成后,会进入到onApplicationEvent 方法:

  1. public void onApplicationEvent(WebServerInitializedEvent event) {
  2. this.bind(event);
  3. }

在这里插入图片描述
接着进入到AbstractAutoServiceRegistrationbind()方法:

  1. @Deprecated
  2. public void bind(WebServerInitializedEvent event) {
  3. // 获取上下文
  4. ApplicationContext context = event.getApplicationContext();
  5. if (!(context instanceof ConfigurableWebServerApplicationContext) || !"management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace())) {
  6. // 设置端口为程序启动端口
  7. this.port.compareAndSet(0, event.getWebServer().getPort());
  8. // 调用开始方法:
  9. this.start();
  10. }
  11. }

start()方法中,调用注册方法,并发布一些事件和日志:

  1. public void start() {
  2. // 没有开启自动注册,打印debug 日志
  3. if (!this.isEnabled()) {
  4. if (logger.isDebugEnabled()) {
  5. logger.debug("Discovery Lifecycle disabled. Not starting");
  6. }
  7. } else {
  8. if (!this.running.get()) {
  9. // 开始标记,最初为false
  10. // 发布实例预注册事件
  11. this.context.publishEvent(new InstancePreRegisteredEvent(this, this.getRegistration()));
  12. // 开始注册
  13. this.register();
  14. if (this.shouldRegisterManagement()) {
  15. this.registerManagement();
  16. }
  17. // 发布实例注册完成事件
  18. this.context.publishEvent(new InstanceRegisteredEvent(this, this.getConfiguration()));
  19. this.running.compareAndSet(false, true);
  20. }
  21. }
  22. }
  23. // 调用服务注册器进行注册
  24. protected void register() {
  25. this.serviceRegistry.register(this.getRegistration());
  26. }

注册方法,调用的是NacosServiceRegistryregister(Registration registration)方法:

  1. public void register(Registration registration) {
  2. // 服务名为空,打印警告日志
  3. if (StringUtils.isEmpty(registration.getServiceId())) {
  4. log.warn("No service to register for nacos client...");
  5. } else {
  6. NamingService namingService = this.namingService();// // NamingService
  7. String serviceId = registration.getServiceId(); // 服务名=>app-service001
  8. String group = this.nacosDiscoveryProperties.getGroup();// 配置的分组,默认 DEFAULT_GROUP
  9. // 创建实例对象
  10. Instance instance = this.getNacosInstanceFromRegistration(registration);
  11. try {
  12. // 调用NamingService 注册实例
  13. namingService.registerInstance(serviceId, group, instance);
  14. log.info("nacos registry, {} {} {}:{} register finished", new Object[]{
  15. group, serviceId, instance.getIp(), instance.getPort()});
  16. } catch (Exception var7) {
  17. if (this.nacosDiscoveryProperties.isFailFast()) {
  18. log.error("nacos registry, {} register failed...{},", new Object[]{
  19. serviceId, registration.toString(), var7});
  20. ReflectionUtils.rethrowRuntimeException(var7);
  21. } else {
  22. log.warn("Failfast is false. {} register failed...{},", new Object[]{
  23. serviceId, registration.toString(), var7});
  24. }
  25. }
  26. }
  27. }

在获取实例的方法中,可以看到设置了很多实例信息:

  1. private Instance getNacosInstanceFromRegistration(Registration registration) {
  2. Instance instance = new Instance();
  3. instance.setIp(registration.getHost()); // 当前服务IP=》192.168.142.1
  4. instance.setPort(registration.getPort()); // 启动端口=》 9005
  5. instance.setWeight((double)this.nacosDiscoveryProperties.getWeight()); // 权重=》1.0
  6. instance.setClusterName(this.nacosDiscoveryProperties.getClusterName()); // 集群名称=》DEFAULT
  7. instance.setEnabled(this.nacosDiscoveryProperties.isInstanceEnabled()); // 是否主动注册的实例
  8. instance.setMetadata(registration.getMetadata()); // 元数据
  9. instance.setEphemeral(this.nacosDiscoveryProperties.isEphemeral()); // 临时实例还是永久实例
  10. return instance;
  11. }

在注册实例registerInstance方法中,会调用客户端代理进行注册:

  1. public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException
  2. // 检查是否合法:心跳时间、服务剔除时间、集群名称是否合法
  3. // 心跳机制:
  4. // Nacos Server会开启一个定时任务用来检查注册服务实例的健康状况,
  5. // 对于超过15s没有收到客户端心跳的实例会将它的healthy属性设置为false(客户端服务发现时不会发现)。
  6. // 如果某个实例超过30秒没有收到心跳,直接剔除该实例(被剔除的实例如果恢复发送心跳则会重新注册)
  7. NamingUtils.checkInstanceIsLegal(instance);
  8. // 调用客户端代理进行注册
  9. this.clientProxy.registerService(serviceName, groupName, instance);
  10. }
  11. // 获取客户端执行代理进行注册
  12. public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
  13. this.getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
  14. }

获取到的通信注册客户端为NamingGrpcClientProxy

  1. // 获取执行客户端代理,如果是临时实例,则会使用RPC,不是则使用HTTP。
  2. // 通信层统一到 gRPC 协议,同时完善了客户端和服务端的流量控制和负载均衡能力,提升的整体吞吐。
  3. // 由于通信使用了 RPC 方式,因此某一客户端的所有请求(无论是注册还是订阅)都通过同一个链接和同一个服务节点进行,不像之前通过 HTTP 连接可能每次请求都请求在不同的 Nacos 节点上,这就导致了服务发现的数据内容由原来的无状态化变为了与连接状态绑定的一种有状态数据。
  4. private NamingClientProxy getExecuteClientProxy(Instance instance) {
  5. return (NamingClientProxy)(instance.isEphemeral() ? this.grpcClientProxy : this.httpClientProxy);
  6. }

NamingGrpcClientProxy 缓存实例信息,接着继续注册:

  1. public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
  2. LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", new Object[]{
  3. this.namespaceId, serviceName, instance});
  4. // 缓存放入内存ConcurrentMap,DEFAULT_GROUP@@app-service001=》实例信息
  5. this.redoService.cacheInstanceForRedo(serviceName, groupName, instance);
  6. // 调用客户端注册
  7. this.doRegisterService(serviceName, groupName, instance);
  8. }

最终,发送gprc请求到服务端,客户端完成注册:

  1. public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
  2. // 创建请求对象
  3. InstanceRequest request = new InstanceRequest(this.namespaceId, serviceName, groupName, "registerInstance", instance);
  4. // 发送请求到服务端
  5. this.requestToServer(request, Response.class);
  6. this.redoService.instanceRegistered(serviceName, groupName);
  7. }
  8. private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass) throws NacosException {
  9. try {
  10. // 处理注册中心设置权限时,需要携带认证消息头的情况
  11. request.putAllHeader(this.getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
  12. Response response = this.requestTimeout < 0L ? this.rpcClient.request(request) : this.rpcClient.request(request, this.requestTimeout);
  13. // 处理响应结果
  14. if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
  15. throw new NacosException(response.getErrorCode(), response.getMessage());
  16. }
  17. if (responseClass.isAssignableFrom(response.getClass())) {
  18. return response;
  19. }
  20. LogUtils.NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'", response.getClass().getName(), responseClass.getName());
  21. } catch (NacosException var4) {
  22. throw var4;
  23. } catch (Exception var5) {
  24. throw new NacosException(500, "Request nacos server failed: ", var5);
  25. }
  26. throw new NacosException(500, "Server return invalid response");
  27. }

可以在控制台看到打印的注册日志:
在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,238人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Nacos客户解读

    背景:         如今的网络服务大多都是分布式的集群部署,采用的架构也是面向服务的架构(SOA),大家都在服务治理领域奋斗,这不,今天要说的Nacos也是一个