rocketmq源码剖析之producer和consumer的启动关闭

浅浅的花香味﹌ 2023-06-09 10:05 146阅读 0赞

producerrocketmq的作用是消息的生产者,consumerrocketmq的作用是消息的消费者,它的生命周期是跟项目相关的,即是由使用者控制的。而为什么要将这两个角色的启动关闭流程放在一起剖析呢?是因为他们都是MQ的客户端,在启动和关闭的行为上,有着很多共同的地方。接下来便将会来仔细探究其启动和关闭的过程。

Producer

DefaultMQProducer

DefaultMQProducer的启动

producer在启动的时候会做一系列的内部初始化,其启动的源码如下所示:

  1. public void start() throws MQClientException {
  2. this.setProducerGroup(withNamespace(this.producerGroup));
  3. this.defaultMQProducerImpl.start();
  4. if (null != traceDispatcher) {
  5. try {
  6. traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
  7. } catch (MQClientException e) {
  8. log.warn("trace dispatcher start failed ", e);
  9. }
  10. }
  11. }

从简短的代码可以看见其做的事情是对producergroup的设置、核心初始化、trace的初始化。接下来便仔细说明下核心初始化过程,其余是其和consumer所共有的,会在公共行为部分进行剖析。

producer核心初始化

producer启动初始化化部分核心逻辑如下,第一次启动的时候它会执行CREATE_JUST分支:

  1. public void start(final boolean startFactory) throws MQClientException {
  2. switch (this.serviceState) {
  3. case CREATE_JUST:
  4. this.serviceState = ServiceState.START_FAILED;
  5. // 校验group参数
  6. this.checkConfig();
  7. if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
  8. // 设置producer的实例名
  9. this.defaultMQProducer.changeInstanceNameToPID();
  10. }
  11. // 创建MQClient实例
  12. this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
  13. // 向MQ客户端的producer表中注册
  14. boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
  15. if (!registerOK) {
  16. this.serviceState = ServiceState.CREATE_JUST;
  17. throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
  18. + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
  19. null);
  20. }
  21. // 向当前的producer的topic表中注册一个用于测试或demo用的topic “TBW102”
  22. this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
  23. if (startFactory) {
  24. // 启动MQClient
  25. mQClientFactory.start();
  26. }
  27. log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
  28. this.defaultMQProducer.isSendMessageWithVIPChannel());
  29. this.serviceState = ServiceState.RUNNING;
  30. break;
  31. case RUNNING:
  32. case START_FAILED:
  33. case SHUTDOWN_ALREADY:
  34. throw new MQClientException("The producer service state not OK, maybe started once, "
  35. + this.serviceState
  36. + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
  37. null);
  38. default:
  39. break;
  40. }
  41. // broker的心跳机制发送
  42. this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
  43. }

浏览过这段代码,便可以很容易看出其中的核心逻辑在于MQClient的启动,以及producerbroker之间的心跳触发,这两部分是与consumer所共有的,会在公共行为部分进行剖析。

DefaultMQProducer的关闭

producer的关闭行为是producerRUNNING状态到SHUTDOWN_ALREADY状态的过程,主要做了三件事:

  • 从注册表中清除了本身实例;
  • 关闭了线程池资源;
  • 清楚了client的资源;

核心代码如下所示:

  1. this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup());
  2. this.defaultAsyncSenderExecutor.shutdown();
  3. if (shutdownFactory) {
  4. this.mQClientFactory.shutdown();
  5. }

TransactionMQProducer

TransactionMQProducer的启动

事务生产者TransactionMQProducer是默认的生产者的一个增强,它在启动方面多了一步操作:

  1. public void start() throws MQClientException {
  2. this.defaultMQProducerImpl.initTransactionEnv();
  3. super.start();
  4. }

可以看出多的这一步则是在事务方面的一个初始化,即事务检查的线程池初始化,其源码如下所示:

  1. public void initTransactionEnv() {
  2. TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
  3. if (producer.getExecutorService() != null) {
  4. this.checkExecutor = producer.getExecutorService();
  5. } else {
  6. this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
  7. this.checkExecutor = new ThreadPoolExecutor(
  8. producer.getCheckThreadPoolMinSize(),
  9. producer.getCheckThreadPoolMaxSize(),
  10. 1000 * 60,
  11. TimeUnit.MILLISECONDS,
  12. this.checkRequestQueue);
  13. }
  14. }

TransactionMQProducer的关闭

同样,事务生产者在关闭的时候也要多一步关闭事务事务检查线程池资源的操作,具体就不在详细说明。

Consumer

PushConsumer

PushConsumer的启动

push模式的消费者同生产者一样,在启动的时候依然是先包装group,然后在执行核心流程,最后在根据情况初始化trace。其源码如下:

  1. public void start() throws MQClientException {
  2. setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
  3. this.defaultMQPushConsumerImpl.start();
  4. if (null != traceDispatcher) {
  5. try {
  6. traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
  7. } catch (MQClientException e) {
  8. log.warn("trace dispatcher start failed ", e);
  9. }
  10. }
  11. }

接下来便仔细说明下核心初始化过程,其余是其和producer所共有的,会在公共行为部分进行剖析。

PushConsumer核心初始化

这里直接附上核心的start过程源码。其一样的走CREATE_JUST分支,如下所示:

  1. public synchronized void start() throws MQClientException {
  2. switch (this.serviceState) {
  3. case CREATE_JUST:
  4. log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
  5. this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
  6. this.serviceState = ServiceState.START_FAILED;
  7. // 校验group参数
  8. this.checkConfig();
  9. // 进行拷贝注册(拷贝topic,监听器等)
  10. this.copySubscription();
  11. if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
  12. // 如果是集群模式则根据PID改变实例名
  13. this.defaultMQPushConsumer.changeInstanceNameToPID();
  14. }
  15. // 创建MQClient实例
  16. this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
  17. // 设置均衡负载
  18. this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
  19. this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
  20. this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
  21. this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
  22. // pull-api包装
  23. this.pullAPIWrapper = new PullAPIWrapper(
  24. mQClientFactory,
  25. this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
  26. // 注册消息过滤钩子
  27. this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
  28. // 复制offset-store
  29. if (this.defaultMQPushConsumer.getOffsetStore() != null) {
  30. this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
  31. } else {
  32. switch (this.defaultMQPushConsumer.getMessageModel()) {
  33. case BROADCASTING:
  34. // 广播模式设置为本地文件存储模式
  35. this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
  36. break;
  37. case CLUSTERING:
  38. // 集群模式将偏移两设置为远程broker存储
  39. this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
  40. break;
  41. default:
  42. break;
  43. }
  44. this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
  45. }
  46. // 加载消费偏移情况
  47. this.offsetStore.load();
  48. if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
  49. // 有序消费监听器
  50. this.consumeOrderly = true;
  51. this.consumeMessageService =
  52. new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
  53. } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
  54. // 并发消费监听器
  55. this.consumeOrderly = false;
  56. this.consumeMessageService =
  57. new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
  58. }
  59. // 启动消费消息的服务(这里的start其实是启动了一个定时任务用于清理消费超时的消息)
  60. this.consumeMessageService.start();
  61. //向MQ客户端的consumer表中注册
  62. boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
  63. if (!registerOK) {
  64. this.serviceState = ServiceState.CREATE_JUST;
  65. this.consumeMessageService.shutdown();
  66. throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
  67. + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
  68. null);
  69. }
  70. // mqclient启动
  71. mQClientFactory.start();
  72. log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
  73. this.serviceState = ServiceState.RUNNING;
  74. break;
  75. case RUNNING:
  76. case START_FAILED:
  77. case SHUTDOWN_ALREADY:
  78. throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
  79. + this.serviceState
  80. + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
  81. null);
  82. default:
  83. break;
  84. }
  85. // 更新topic注册数据
  86. this.updateTopicSubscribeInfoWhenSubscriptionChanged();
  87. // 检测topic所在的broker是否能够连接成功
  88. this.mQClientFactory.checkClientInBroker();
  89. // 触发想broker的心跳
  90. this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
  91. // 直接触发一次平衡操作
  92. this.mQClientFactory.rebalanceImmediately();
  93. }

这里来看一下消费偏移的load情况,消费偏移的加载分从远程broker和本地文件加载,远程由RemoteBrokerOffsetStore来实现,本地由LocalFileOffsetStore来实现,具体使用哪儿种取决于是集群模式还是广播模式,者同样是影响消费偏移是持久化到本地还是远程的问题。

远程的加载其实没有做任何事情,源码如下所示:

  1. @Override
  2. public void load() {
  3. }

从本地文件加载则是会根据是否从文件中读取到内容决定,具体源码如下:

  1. public void load() throws MQClientException {
  2. // 从本地文件读取内容到包装器
  3. OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
  4. if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
  5. // 设置偏移量表
  6. offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
  7. // 打印日志
  8. for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {
  9. AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
  10. log.info("load consumer's offset, {} {} {}",
  11. this.groupName,
  12. mq,
  13. offset.get());
  14. }
  15. }
  16. }

PushConsumer的关闭

push模式consumer在关闭的时候清理资源的情况如下所示:

  1. public synchronized void shutdown() {
  2. switch (this.serviceState) {
  3. case CREATE_JUST:
  4. break;
  5. case RUNNING:
  6. // 关闭消费消息服务
  7. this.consumeMessageService.shutdown();
  8. // 持久化消息的消费情况
  9. this.persistConsumerOffset();
  10. // 从注册表移除当前的消费者
  11. this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
  12. // 关闭MQClient资源
  13. this.mQClientFactory.shutdown();
  14. log.info("the consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup());
  15. this.rebalanceImpl.destroy();
  16. this.serviceState = ServiceState.SHUTDOWN_ALREADY;
  17. break;
  18. case SHUTDOWN_ALREADY:
  19. break;
  20. default:
  21. break;
  22. }
  23. }

PullConsumer

PullConsumer的启动

pull模式的消费者在启动的时候则只有包装group和执行核心流程,其源码如下:

  1. public void start() throws MQClientException {
  2. this.setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
  3. this.defaultMQPullConsumerImpl.start();
  4. }

接下来便仔细说明下核心初始化过程,包装group部分会在公共行为部分进行剖析。

PullConsumer核心初始化

这里直接附上核心的start过程源码,其一样的走CREATE_JUST分支,如下所示:

  1. public synchronized void start() throws MQClientException {
  2. switch (this.serviceState) {
  3. case CREATE_JUST:
  4. this.serviceState = ServiceState.START_FAILED;
  5. // 相关配置检查
  6. this.checkConfig();
  7. // 进行拷贝注册
  8. this.copySubscription();
  9. if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
  10. // 如果是集群模式则更改实例名
  11. this.defaultMQPullConsumer.changeInstanceNameToPID();
  12. }
  13. // 获取MQ客户端实例
  14. this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);
  15. // 设置pull模式的均衡策略
  16. this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
  17. this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
  18. this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
  19. this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
  20. // 构造pullAPI的包装器
  21. this.pullAPIWrapper = new PullAPIWrapper(
  22. mQClientFactory,
  23. this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
  24. this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
  25. // pull模式的消息消费偏移存储
  26. if (this.defaultMQPullConsumer.getOffsetStore() != null) {
  27. this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();
  28. } else {
  29. switch (this.defaultMQPullConsumer.getMessageModel()) {
  30. case BROADCASTING:
  31. this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
  32. break;
  33. case CLUSTERING:
  34. this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
  35. break;
  36. default:
  37. break;
  38. }
  39. this.defaultMQPullConsumer.setOffsetStore(this.offsetStore);
  40. }
  41. // 加载消费偏移情况
  42. this.offsetStore.load();
  43. // 注册pull消费者
  44. boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);
  45. if (!registerOK) {
  46. this.serviceState = ServiceState.CREATE_JUST;
  47. throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()
  48. + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
  49. null);
  50. }
  51. // MQClient的启动
  52. mQClientFactory.start();
  53. log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());
  54. this.serviceState = ServiceState.RUNNING;
  55. break;
  56. case RUNNING:
  57. case START_FAILED:
  58. case SHUTDOWN_ALREADY:
  59. throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
  60. + this.serviceState
  61. + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
  62. null);
  63. default:
  64. break;
  65. }
  66. }

走览玩这段源码,可以看见,pull模式和push模式的消费者核心启动流程中,唯一的区别就是push模式下会有注册消息消费的监听器功能和最后检测连接操作,从这便可以看出,其实push模式是在pull模式上做了enhance。

PullConsumer的关闭

pull模式consumer在关闭的时候清理资源的情况如下所示:

  1. public synchronized void shutdown() {
  2. switch (this.serviceState) {
  3. case CREATE_JUST:
  4. break;
  5. case RUNNING:
  6. // 持久化消费偏移
  7. this.persistConsumerOffset();
  8. // 将消费者移除注册表
  9. this.mQClientFactory.unregisterConsumer(this.defaultMQPullConsumer.getConsumerGroup());
  10. // 关闭MQClient,清理资源
  11. this.mQClientFactory.shutdown();
  12. log.info("the consumer [{}] shutdown OK", this.defaultMQPullConsumer.getConsumerGroup());
  13. this.serviceState = ServiceState.SHUTDOWN_ALREADY;
  14. break;
  15. case SHUTDOWN_ALREADY:
  16. break;
  17. default:
  18. break;
  19. }
  20. }

公共行为

包装设置 group

对于group的设置,主要是在给予的group参数基础上加了一层名字空间域namespace,而在包装的过程中首先是要获取namespace的值,源码如下:

  1. public String getNamespace() {
  2. if (StringUtils.isNotEmpty(namespace)) {
  3. return namespace;
  4. }
  5. if (StringUtils.isNotEmpty(this.namesrvAddr)) {
  6. if (NameServerAddressUtils.validateInstanceEndpoint(namesrvAddr)) {
  7. return NameServerAddressUtils.parseInstanceIdFromEndpoint(namesrvAddr);
  8. }
  9. }
  10. return namespace;
  11. }

这段源码来自ClientConfig,可以看见其是先尝试获取client中配置的值,如果值不存在的话便尝试从实例的Endpoint中获取,从端点中获取是通过匹正则来正则来判断的,如果匹配成功,则从InstanceEndpoint中截取namespace得值,正则表达式如下所示:

  1. ^http://MQ_INST_\w+_\w+\..*

只要没有做配置,那便获取不到值,因为不存在默认值。在执行完毕获取namespace值的操作后,便是真正的来包装group的值了。源码如下所示:

  1. public static String wrapNamespace(String namespace, String resourceWithOutNamespace) {
  2. // 如果namespace为空或本身的group值为空
  3. if (StringUtils.isEmpty(namespace) || StringUtils.isEmpty(resourceWithOutNamespace)) {
  4. return resourceWithOutNamespace;
  5. }
  6. // 如果是系统的group或者已经包装过namespace
  7. if (isSystemResource(resourceWithOutNamespace) || isAlreadyWithNamespace(resourceWithOutNamespace, namespace)) {
  8. return resourceWithOutNamespace;
  9. }
  10. String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resourceWithOutNamespace);
  11. StringBuffer strBuffer = new StringBuffer();
  12. // RetryTopic
  13. if (isRetryTopic(resourceWithOutNamespace)) {
  14. strBuffer.append(MixAll.RETRY_GROUP_TOPIC_PREFIX);
  15. }
  16. // DLQTopic
  17. if (isDLQTopic(resourceWithOutNamespace)) {
  18. strBuffer.append(MixAll.DLQ_GROUP_TOPIC_PREFIX);
  19. }
  20. return strBuffer.append(namespace).append(NAMESPACE_SEPARATOR).append(resourceWithoutRetryAndDLQ).toString();
  21. }

消息轨迹功能开启

消息轨迹功能是异步的,其的核心也是一个producer,具体初始化话过程如下:

  1. public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
  2. if (isStarted.compareAndSet(false, true)) {
  3. traceProducer.setNamesrvAddr(nameSrvAddr);
  4. traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
  5. traceProducer.start();
  6. }
  7. this.accessChannel = accessChannel;
  8. this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
  9. this.worker.setDaemon(true);
  10. this.worker.start();
  11. this.registerShutDownHook();
  12. }

MQClient启动

这里直接来看MQClient的启动,如果是第一次启动则走CREATE_JUST分支,核心源码如下:

  1. public void start() throws MQClientException {
  2. synchronized (this) {
  3. switch (this.serviceState) {
  4. case CREATE_JUST:
  5. this.serviceState = ServiceState.START_FAILED;
  6. // If not specified,looking address from name server
  7. if (null == this.clientConfig.getNamesrvAddr()) {
  8. this.mQClientAPIImpl.fetchNameServerAddr();
  9. }
  10. // Start request-response channel
  11. this.mQClientAPIImpl.start();
  12. // Start various schedule tasks
  13. this.startScheduledTask();
  14. // Start pull service
  15. this.pullMessageService.start();
  16. // Start rebalance service
  17. this.rebalanceService.start();
  18. // Start push service
  19. this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
  20. log.info("the client factory [{}] start OK", this.clientId);
  21. this.serviceState = ServiceState.RUNNING;
  22. break;
  23. case RUNNING:
  24. break;
  25. case SHUTDOWN_ALREADY:
  26. break;
  27. case START_FAILED:
  28. throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
  29. default:
  30. break;
  31. }
  32. }
  33. }

可以看见,其主要是做了如下6件事:

  • 再次检查namesrv地址
  • 打开client的交互信道
  • 启动pull-message服务
  • 启动push-message服务
  • 启动均衡负载服务
  • 启动一系列定时任务

其中最核心的便是启动的这些定时任务,具体定时任务如下:

  • 定时刷新namesrv地址;本地的topic路由;
  • 定时处理broker(触发心跳,清理下线的);
  • 定时持久化消费者消费位置;
  • 定时处理消费者的线程池;

    private void startScheduledTask() {

    1. if (null == this.clientConfig.getNamesrvAddr()) {
    2. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    3. @Override
    4. public void run() {
    5. try {
    6. MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
    7. } catch (Exception e) {
    8. log.error("ScheduledTask fetchNameServerAddr exception", e);
    9. }
    10. }
    11. }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
    12. }
    13. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    14. @Override
    15. public void run() {
    16. try {
    17. MQClientInstance.this.updateTopicRouteInfoFromNameServer();
    18. } catch (Exception e) {
    19. log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
    20. }
    21. }
    22. }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
    23. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    24. @Override
    25. public void run() {
    26. try {
    27. MQClientInstance.this.cleanOfflineBroker();
    28. MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
    29. } catch (Exception e) {
    30. log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
    31. }
    32. }
    33. }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
    34. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    35. @Override
    36. public void run() {
    37. try {
    38. MQClientInstance.this.persistAllConsumerOffset();
    39. } catch (Exception e) {
    40. log.error("ScheduledTask persistAllConsumerOffset exception", e);
    41. }
    42. }
    43. }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    44. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    45. @Override
    46. public void run() {
    47. try {
    48. MQClientInstance.this.adjustThreadPool();
    49. } catch (Exception e) {
    50. log.error("ScheduledTask adjustThreadPool exception", e);
    51. }
    52. }
    53. }, 1, 1, TimeUnit.MINUTES);
    54. }

broker心跳触发

在触发心跳的操作中,具体分为两个部分,如下所示:

  1. // 触发心跳
  2. this.sendHeartbeatToAllBroker();
  3. // 上传加载消息过滤类
  4. this.uploadFilterClassSource();

上传消息过滤类资源是为了push模式下的消费者进行消息过滤用的,这里不做探究,直接来看启动触发心跳的核心代码,如下所示:

  1. private void sendHeartbeatToAllBroker() {
  2. final HeartbeatData heartbeatData = this.prepareHeartbeatData();
  3. final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
  4. final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
  5. if (producerEmpty && consumerEmpty) {
  6. log.warn("sending heartbeat, but no consumer and no producer");
  7. return;
  8. }
  9. if (!this.brokerAddrTable.isEmpty()) {
  10. long times = this.sendHeartbeatTimesTotal.getAndIncrement();
  11. Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
  12. while (it.hasNext()) {
  13. Entry<String, HashMap<Long, String>> entry = it.next();
  14. String brokerName = entry.getKey();
  15. HashMap<Long, String> oneTable = entry.getValue();
  16. if (oneTable != null) {
  17. for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
  18. Long id = entry1.getKey();
  19. String addr = entry1.getValue();
  20. if (addr != null) {
  21. if (consumerEmpty) {
  22. if (id != MixAll.MASTER_ID)
  23. continue;
  24. }
  25. try {
  26. // 在触发心跳的时候获取broker的版本号
  27. int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
  28. if (!this.brokerVersionTable.containsKey(brokerName)) {
  29. this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
  30. }
  31. this.brokerVersionTable.get(brokerName).put(addr, version);
  32. if (times % 20 == 0) {
  33. log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
  34. log.info(heartbeatData.toString());
  35. }
  36. } catch (Exception e) {
  37. if (this.isBrokerInNameServer(addr)) {
  38. log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
  39. } else {
  40. log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
  41. id, addr, e);
  42. }
  43. }
  44. }
  45. }
  46. }
  47. }
  48. }
  49. }

发表评论

表情:
评论列表 (有 0 条评论,146人围观)

还没有评论,来说两句吧...

相关阅读