源码剖析RocketMQ延时消息原理

Myth丶恋晨 2023-10-01 16:08 164阅读 0赞

一、前言

RocketMQ版本4.8.0,本文中相关源码注释见GitHub中:RocketMQ:release-4.8.0。上一篇文章我们分析了RocketMQ的的消费超时/失败重试机制,最终会发送一个延时消息到Broker,本篇接着分析RockeTMQ延时消息的实现机制;

1、消息延时级别

消息的延时级别level一共有18级,分别为:

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

level有以下三种情况:

  • level == 0,消息为非延迟消息
  • 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
  • level > maxLevel,则level== maxLevel,例如level==20,延迟2h

2、定时消息(延迟消息)简介

定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic;

  1. 定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。
  2. broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
  3. 注意:定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
  4. 在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间;

3、定时消息的使用

  1. public class Producer {
  2. public static void main(String[] args) throws Exception {
  3. DefaultMQProducer producer = new DefaultMQProducer("producer_saint");
  4. producer.setNamesrvAddr("127.0.0.1:9876");
  5. producer.start();
  6. Message msg = new Message("consumer-timeout", "msg-body-001".getBytes(StandardCharsets.UTF_8));
  7. msg.setTags("msg-body-001");
  8. // 设置消息延时级别
  9. msg.setDelayTimeLevel(3);
  10. List<Message> list = new ArrayList<>();
  11. list.add(msg);
  12. SendResult send = producer.send(list);
  13. System.out.println("sendResult: " + send);
  14. }
  15. }

二、源码分析

1、整体实现流程

整体流程如下图,executeOnTimeup()方法部分代码太多,流程图中采用文字说明;
在这里插入图片描述

2、入口

ScheduleMessageService#start()

在Broker启动时会间接执行ScheduleMessageService#start(),执行启动延时消息服务操作;下面我们从Broker的核心类BrokerController中开始看起;

1> BrokerStartup#start()

这里是Broker启动的核心;关于Broker的启动流程,请参考这篇文章:RocketMQ:深度剖析Broker启动流程原理、源码
在这里插入图片描述

紧接着进入到BrokerController#start();在这里会启动消息持久化服务MessageStore

2> DefaultMessageStore#start()

在这里插入图片描述
MessageStore是一个接口,下面会进入到它的实现类DefaultMessageStore中;在DefaultMessageStore#start()方法中会判断Broker是否开启了DLegerCommitLog,如果没有并且Broker的角色不是Slave,才会开启延时消息服务

在这里插入图片描述
在这里插入图片描述

ScheduleMessageService#start()是开启延时消息服务的核心,下面我们接着看;

3、核心逻辑

ScheduleMessageService#start()中主要做两件事:

  1. 为每个延时级别都分别开启一个定时任务,每秒执行一次发送延迟消息到真实Topic的操作;
  2. 延时10s为每个延时级别都分别开启一个定时任务,每10s做一次延时队列中消息偏移量的持久化;

    public void start() {

    if (started.compareAndSet(false, true)) {

    1. this.timer = new Timer("ScheduleMessageTimerThread", true);
    2. // delayLevelTable中存放着每个延时级别和其对应的消息offset
    3. for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
    4. Integer level = entry.getKey();
    5. Long timeDelay = entry.getValue();
    6. Long offset = this.offsetTable.get(level);
    7. if (null == offset) {
    8. offset = 0L;
    9. }
    10. if (timeDelay != null) {
    11. // 1、每秒从"SCHEDULE_TOPIC_XXXX" topic中取数据
    12. this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
    13. }
    14. }
    15. // 2、延时10s启动,并且每10s把每一个延迟队列的最大消息偏移量写入到磁盘中
    16. this.timer.scheduleAtFixedRate(new TimerTask() {
  1. @Override
  2. public void run() {
  3. try {
  4. if (started.get()) ScheduleMessageService.this.persist();
  5. } catch (Throwable e) {
  6. log.error("scheduleAtFixedRate flush exception", e);
  7. }
  8. }
  9. }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
  10. }
  11. }

1> ScheduleMessageService.this.persist()

我们先看ScheduleMessageService.this.persist(),点进去会进去到ConfigManager#persist()方法中;这里的操作就单纯的持久化delayQueue的offset到delayOffset.json文件中;

  1. public synchronized void persist() {
  2. // 读取offsetTable缓存的延迟队列的值
  3. String jsonString = this.encode(true);
  4. if (jsonString != null) {
  5. // 读取delayOffset.json的文件地址
  6. String fileName = this.configFilePath();
  7. try {
  8. // 持久化到delayOffset.json文件中
  9. MixAll.string2File(jsonString, fileName);
  10. } catch (IOException e) {
  11. log.error("persist file " + fileName + " exception", e);
  12. }
  13. }
  14. }

2> DeliverDelayedMessageTimerTask

DeliverDelayedMessageTimerTaskTimerTask的子类,表示一个线程任务;其主要作用是扫描延迟消息队列(SCHEDULE_TOPIC_XXXX)的消息,将该延迟消息转换为真实topic的消息。

这里真实消息的topic有几个特殊之处:

  • 对于(并发消费模式下)消费超时重试的消息而言,真实的topic是%RETRY%+consumerGroup;
1)DeliverDelayedMessageTimerTask#run()

在这里插入图片描述
我们接着看看这个线程任务内部,执行了executeOnTimeup()方法;这是判断延时消息是否到应该转发到真实topic的核心逻辑。

2)executeOnTimeup()

由于代码篇幅过长,这里我们从五个问题着手分析;

  1. 根据延时级别获取在SCHEDULE_TOPIC_XXXX主题下queueId为delayLevel - 1的延时队列是否存在? 延时队列不存在怎么处理?
  2. 延时消息是否到期?到期后怎么处理?
  3. 延时消息到期后写入CommitLog失败怎么处理?
  4. 延时消息没到期怎么处理?
  5. 延时队列中的消息处理完怎么处理?

先看第一问题:

(1)根据延时级别获取在SCHEDULE_TOPIC_XXXX主题下queueId为delayLevel - 1的延时队列是否存在? 延时队列不存在怎么办?

判断当前延时级别对应在SCHEDULE_TOPIC_XXXX主题下的queue是否存在,如果存在进入第二个问题,否则隔0.1s再次开启当前TimerTask
在这里插入图片描述
再来看第二个问题:

(2)延时消息是否到期?到期后怎么处理?

在第一问题成立之后,我们已经获取到延时级别对应的延时队列,接下来首先要根据offset从ConsumeQueue中获取到延时消息的部分信息(offset、size、到期时间);接着再判断消息是否到期,并计算出下一个延时消息在延时队列中的offset
在这里插入图片描述
如果消息到期,再从commitlog中根据ofsetPy取出完整的消息,解析出消息的真实Topic和Queue,并清除消息的延时属性,然后将消息写入到CommitLog中;
在这里插入图片描述
对于如何判断消息过期,我们再跟一下correctDeliverTimestamp()方法;

从消息tagsCode属性中解析出消息应当被投递的时间,然后与当前时间做比较,判断是否应该进行投递(消息是否到期);

在这里插入图片描述

再来看第三个问题:

(3)延时消息到期后写入CommitLog失败怎么办?

如果写入CommitLog失败,则延时10s重新开启当前TimerTask,持久化delayOffset;
在这里插入图片描述

再来看第四个问题:

(4)延时消息没到期怎么处理?

如果消息未到期,则延时countdown(countdown为延时队列中第一个消息的剩余到期时间),开启一个TimerTask,并持久化delayOffset;
在这里插入图片描述

最后再看第五个问题:

(5)延时队列中的消息处理完怎么办?

遍历完相应延时级别的延时队列后,更新下一次开始读取延迟队列的offset,然后延时0.1s开启当前TimerTask,并持久化delayOffset;最后退出当前方法
在这里插入图片描述

executeOnTimeup()方法完成代码如下:

  1. public void executeOnTimeup() {
  2. // 根据延迟级别找到topic为SCHEDULE_TOPIC_XXXX的队列(队列的ID 为延时级别 - 1)
  3. ConsumeQueue cq =
  4. ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
  5. delayLevel2QueueId(delayLevel));
  6. long failScheduleOffset = offset;
  7. // 找到延时级别对应的队列
  8. if (cq != null) {
  9. SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
  10. if (bufferCQ != null) {
  11. try {
  12. long nextOffset = offset;
  13. int i = 0;
  14. ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
  15. for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
  16. // 消息的commitLog物理偏移量
  17. long offsetPy = bufferCQ.getByteBuffer().getLong();
  18. // 消息大小
  19. int sizePy = bufferCQ.getByteBuffer().getInt();
  20. // 延迟结束时间,在消息写入到CommitLog之后会分发到consumeQueue;
  21. // 对于延迟消息而言,tagsCode存储的是消息的延迟到期时间
  22. long tagsCode = bufferCQ.getByteBuffer().getLong();
  23. if (cq.isExtAddr(tagsCode)) {
  24. if (cq.getExt(tagsCode, cqExtUnit)) {
  25. tagsCode = cqExtUnit.getTagsCode();
  26. } else {
  27. //can't find ext content.So re compute tags code.
  28. log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
  29. tagsCode, offsetPy, sizePy);
  30. long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
  31. tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
  32. }
  33. }
  34. long now = System.currentTimeMillis();
  35. // 计算是否到消息投递时间
  36. long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
  37. // 定时任务下一次开始读取延迟队列的offset
  38. nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
  39. // 剩余的延时时间
  40. long countdown = deliverTimestamp - now;
  41. if (countdown <= 0) {
  42. // 根据CommitLog物理偏移量找到msg
  43. MessageExt msgExt =
  44. ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
  45. offsetPy, sizePy);
  46. if (msgExt != null) {
  47. try {
  48. // 解析消息体,取出真实的topic和queue(多为%RETRY% + consumerGroup)
  49. MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
  50. if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
  51. log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
  52. msgInner.getTopic(), msgInner);
  53. continue;
  54. }
  55. // 将消息写入到commitLog中
  56. PutMessageResult putMessageResult =
  57. ScheduleMessageService.this.writeMessageStore
  58. .putMessage(msgInner);
  59. // 写入成功,跳过该次循环判断下一条延迟消息是否达到到期时间
  60. if (putMessageResult != null
  61. && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
  62. log.info("send msg to real topic: {} from schedule topic: {}", msgInner.getTopic() ,msgExt.getTopic());
  63. continue;
  64. } else {
  65. // XXX: warn and notify me
  66. log.error(
  67. "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
  68. msgExt.getTopic(), msgExt.getMsgId());
  69. // 写入消息失败,则延时10s重新执行TimerTask
  70. ScheduleMessageService.this.timer.schedule(
  71. new DeliverDelayedMessageTimerTask(this.delayLevel,
  72. nextOffset), DELAY_FOR_A_PERIOD);
  73. ScheduleMessageService.this.updateOffset(this.delayLevel,
  74. nextOffset);
  75. return;
  76. }
  77. } catch (Exception e) {
  78. /*
  79. * XXX: warn and notify me
  80. */
  81. log.error(
  82. "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
  83. + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
  84. + offsetPy + ",sizePy=" + sizePy, e);
  85. }
  86. }
  87. } else {
  88. // 这里说明,延时队列中最小到期的那条消息都还没到延迟时间
  89. // 重新提交一个TimerTask,延迟执行时间为延时队列中第一个消息剩余的延时时间
  90. ScheduleMessageService.this.timer.schedule(
  91. new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
  92. countdown);
  93. // 更新延时队列已消费的消息偏移量
  94. ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
  95. return;
  96. }
  97. } // end of for
  98. // 定时任务下一次开始读取延迟队列的offset
  99. nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
  100. // 开始一个延时100ms执行的定时任务 消费延时队列
  101. ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
  102. this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
  103. // 将下一次读取延迟队列的offset存放到一个缓存map中
  104. ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
  105. return;
  106. } finally {
  107. bufferCQ.release();
  108. }
  109. } // end of if (bufferCQ != null)
  110. else {
  111. long cqMinOffset = cq.getMinOffsetInQueue();
  112. if (offset < cqMinOffset) {
  113. failScheduleOffset = cqMinOffset;
  114. log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
  115. + cqMinOffset + ", queueId=" + cq.getQueueId());
  116. }
  117. }
  118. } // end of if (cq != null)
  119. ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
  120. failScheduleOffset), DELAY_FOR_A_WHILE);
  121. }

源码分析完了,我们来总结一下实现原理。

三、实现原理

消息的发送:

  1. producer设置消息的delayLevel延迟级别,并在消息属性DELAY中存储对应的延时级别;
  2. broker端收到消息后,判断延时消息延迟级别,如果大于0,则备份消息原始的topic和queueId,并将消息topic改为延时消息队列特定topic(SCHEDULE_TOPIC_XXXX),queueId改为(延时级别-1)

消息的处理:

  1. MQ服务端(Broker)的ScheduleMessageService中,为每一个延迟级别分别开启一个定时器,定时(每隔1秒)从延迟级别对应的的ConsumeQueue消费队列中拉取消息
  2. 然后根据消费偏移量offset从commitLog中解析出对应的消息
  3. 从消息tagsCode属性中解析出消息应当被投递的时间,然后与当前时间做比较,判断是否应该进行投递(消息是否到期);
  4. 若到达了投递时间(消息到期),则构建一个新的消息,从源消息属性中解析出出真实的topic和queueId,并清除消息的延迟属性;将其写入到CommitLog中

四、对延时消息机制的思考

优点:

  • 设计简单,把所有相同延迟时间的消息都先放到一个队列中,做定时扫描,可以保证消息消费的有序性;
  • 延时队列中的消息时按消息到期时间进行递增排序,也就是说队列中消息越靠前的到期时间越早;

缺点:

  • 延时消息机制所有的定时任务都在一个定时器中,定时器采用的java.util.Timer,而Timer是单线程运行的;如果延迟消息的数量很大大的话,可能单线程处理不过来,也就会造成消息到期后没有及时发送出去的现象,甚至会造成消息拥堵;
    在这里插入图片描述

可能的改进点:

  • 为每个延迟队列上分别采用一个Timer;
  • 或者说仅使用Timer开始定时任务做扫描,而消息处理的核心逻辑使用线程池处理,进而提高消息处理的效率;

发表评论

表情:
评论列表 (有 0 条评论,164人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RocketMQ进阶-消息

        前言 在开发中经常会遇到延时任务的需求,例如在12306购买车票,若生成订单30分钟未支付则自动取消;还有在线商城完成订单后48小时不评价 ,自动5星好评。像