RocketMQ原理学习--延时消息实现原理

阳光穿透心脏的1/2处 2022-04-05 17:26 388阅读 0赞
  1. RocketMQ提供了延时消息类型,简单来说就是生产者在发送消息的时候指定一个延时时间,当到达延时时间之后消息才能够被投送到消费者。
  2. 首先我们可以考虑一下RocketMQ的延时消息是如何实现:

(1)生产者将延时消息发送到Broker,Broker是如何区分普通消息和延时消息(消息类型)

(2)消息如何保证不被消费者拉取到的(RocketMQ将消息以SCHEDULE_TOPIC_XXXX为topic将延时消息持久化,等到达延时消息之后再以原有的topic重新保存)。

1、简单示例

(1)RocketMQ目前指定的延时时间间隔有1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h,用等级来表示时间间隔。

  1. public class DelayProducer {
  2. public static void main(String[] args) throws MQClientException, InterruptedException {
  3. DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
  4. producer.setNamesrvAddr("localhost:9876");
  5. producer.start();
  6. try {
  7. for (int i = 0; i < 3; i++) {
  8. Message msg = new Message("TopicA-test",// topic
  9. "TagA",// tag
  10. (new Date() + "Hello RocketMQ ,QuickStart 11" + i)
  11. .getBytes()// body
  12. );
  13. //1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。
  14. // level=0,表示不延时。level=1,表示 1 级延时,对应延时 1s。level=2 表示 2 级延时,对应5s,以此类推
  15. msg.setDelayTimeLevel(2);
  16. SendResult sendResult = producer.send(msg);
  17. System.out.println(sendResult);
  18. }
  19. } catch (Exception e) {
  20. e.printStackTrace();
  21. }
  22. producer.shutdown();
  23. }
  24. }

2、Broker持久化延时消息

  1. Broker对于接收到的消息首先会判断一下是不是延时消息,如果是延时消息会将消息以SCHEDULE\_TOPIC\_XXXXtopic替换原有的topic名称进行持久化,实现方法在CommitLogputMessage中。
  2. 首先会判断msg的延时标准如果大于0,则重新设置消息的topic名称和queueId,之后将消息以SCHEDULE\_TOPIC\_XXXXtopic,以延时时间的等级为queueId持久化到commitlog文件中。
  3. // Delay Delivery
  4. if (msg.getDelayTimeLevel() > 0) {
  5. if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
  6. msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
  7. }
  8. topic = ScheduleMessageService.SCHEDULE_TOPIC;
  9. queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
  10. // Backup real topic, queueId
  11. MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
  12. MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
  13. msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
  14. msg.setTopic(topic);
  15. msg.setQueueId(queueId);
  16. }

3、延时消息处理

  1. Broker将延时消息以SCHEDULE\_TOPIC\_XXXXtopic名称将消息进行持久化,接下来我们看看Broker是如何将消息在延时消息到达之后进行消息还原的。
  2. RocketMQ提供了定时任务服务ScheduleMessageService,通过定时任务的方式不断的读取topicSCHEDULE\_TOPIC\_XXXXqueueId为延时等级的消息进行消息还原处理,这样消息被还原之后消费者就可以拉取消息了。

每个消费等级有个定时任务DeliverDelayedMessageTimerTask:

  1. for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
  2. Integer level = entry.getKey();
  3. Long timeDelay = entry.getValue();
  4. Long offset = this.offsetTable.get(level);
  5. if (null == offset) {
  6. offset = 0L;
  7. }
  8. if (timeDelay != null) {
  9. this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
  10. }
  11. }

在DeliverDelayedMessageTimerTask中根据SCHEDULE_TOPIC_XXXX名称和延时等级对应的queueId获取消息队列,然后从commitlog中读取消息,还原消息的原有信息(消息的原topic信息)再将消息持久化到commitlog文件中,这样消费者就可以拉取消息了。

  1. public void executeOnTimeup() {
  2. //获取消费者消息
  3. ConsumeQueue cq =
  4. ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
  5. delayLevel2QueueId(delayLevel));
  6. long failScheduleOffset = offset;
  7. if (cq != null) {
  8. //读取消息
  9. SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
  10. if (bufferCQ != null) {
  11. try {
  12. long nextOffset = offset;
  13. int i = 0;
  14. ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
  15. for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
  16. long offsetPy = bufferCQ.getByteBuffer().getLong();
  17. int sizePy = bufferCQ.getByteBuffer().getInt();
  18. long tagsCode = bufferCQ.getByteBuffer().getLong();
  19. if (cq.isExtAddr(tagsCode)) {
  20. if (cq.getExt(tagsCode, cqExtUnit)) {
  21. tagsCode = cqExtUnit.getTagsCode();
  22. } else {
  23. //can't find ext content.So re compute tags code.
  24. log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
  25. tagsCode, offsetPy, sizePy);
  26. long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
  27. tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
  28. }
  29. }
  30. long now = System.currentTimeMillis();
  31. long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
  32. nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
  33. long countdown = deliverTimestamp - now;
  34. if (countdown <= 0) {
  35. //获取消息
  36. MessageExt msgExt =
  37. ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
  38. offsetPy, sizePy);
  39. if (msgExt != null) {
  40. try {
  41. //还原消息信息topic名称等
  42. MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
  43. //重新将消息持久化到commitlog中
  44. PutMessageResult putMessageResult =
  45. ScheduleMessageService.this.defaultMessageStore
  46. .putMessage(msgInner);
  47. if (putMessageResult != null
  48. && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
  49. continue;
  50. } else {
  51. // XXX: warn and notify me
  52. log.error(
  53. "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
  54. msgExt.getTopic(), msgExt.getMsgId());
  55. ScheduleMessageService.this.timer.schedule(
  56. new DeliverDelayedMessageTimerTask(this.delayLevel,
  57. nextOffset), DELAY_FOR_A_PERIOD);
  58. ScheduleMessageService.this.updateOffset(this.delayLevel,
  59. nextOffset);
  60. return;
  61. }
  62. } catch (Exception e) {
  63. /*
  64. * XXX: warn and notify me
  65. */
  66. log.error(
  67. "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
  68. + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
  69. + offsetPy + ",sizePy=" + sizePy, e);
  70. }
  71. }
  72. } else {
  73. ScheduleMessageService.this.timer.schedule(
  74. new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
  75. countdown);
  76. ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
  77. return;
  78. }
  79. } // end of for
  80. nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
  81. ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
  82. this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
  83. ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
  84. return;
  85. } finally {
  86. bufferCQ.release();
  87. }
  88. } // end of if (bufferCQ != null)
  89. else {
  90. long cqMinOffset = cq.getMinOffsetInQueue();
  91. if (offset < cqMinOffset) {
  92. failScheduleOffset = cqMinOffset;
  93. log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
  94. + cqMinOffset + ", queueId=" + cq.getQueueId());
  95. }
  96. }
  97. } // end of if (cq != null)
  98. ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
  99. failScheduleOffset), DELAY_FOR_A_WHILE);
  100. }

总结:

  1. 延时消息的实现还是挺精巧的,首先将延时消息换了一个topic名称进行持久化,这样消费者就无法获取消息,然后有定时任务,会将消息还原到原有的topic信息,这样消费者又可以重新拉取消息了。

发表评论

表情:
评论列表 (有 0 条评论,388人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RocketMQ进阶-消息

        前言 在开发中经常会遇到延时任务的需求,例如在12306购买车票,若生成订单30分钟未支付则自动取消;还有在线商城完成订单后48小时不评价 ,自动5星好评。像

    相关 RocketMQ原理学习--消息类型

    一、消费模式 集群消费:当使用集群消费模式时,MQ 认为任意一条消息只需要被集群内的任意一个消费者处理即可。 广播消费:当使用广播消费模式时,MQ 会将每条消息推送给