基于Rocket MQ扩展的无限延迟消息队列

亦凉 2024-05-23 20:31 158阅读 0赞

基于Rocket MQ扩展的无限延迟消息队列

背景:

  • Rocket MQ支持的延迟队列时间是固定间隔的, 默认19个等级(包含0等级): 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h. 我们的需求是实现用户下单后48小时或72小时给用户发送逼单邮件. 使用默认的延迟消息无法实现该功能, 所以对方案进行了改造.

实现原理:

  • 简单而言, 就是在Rocket MQ延迟队列固定时间间隔的基础上, 通过多次发送延迟消息, 达到任意延时时间组合计算. 通过反射的方式, 实现延迟业务逻辑的调用.
  • 源码如下:
  • /*

    • Copyright (c) 2020-2030 XXX.Co.Ltd. All Rights Reserved.
      */
      package com.example.xxx.utils;

    import com.vevor.bmp.crm.common.constants.MQConstants;
    import lombok.Data;
    import lombok.SneakyThrows;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.spring.annotation.ConsumeMode;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;

    import javax.annotation.Resource;
    import java.io.Serializable;
    import java.util.Calendar;
    import java.util.Date;
    import java.util.concurrent.TimeUnit;

    /**

    • @version :1.8.0
    • @description :基于Rocket MQ的任意延迟时长工具
    • @program :user-growth
    • @date :Created in 2023/5/22 3:35 下午
    • @since :1.8.0
      */
      @Slf4j
      @Component
      @RocketMQMessageListener(consumerGroup = MQConstants.CRM_DELAY_QUEUE_TOPIC_GROUP,

      1. topic = MQConstants.CRM_DELAY_QUEUE_TOPIC,
      2. // 消息消费顺序
      3. consumeMode = ConsumeMode.CONCURRENTLY,
      4. // 最大消息重复消费次数
      5. maxReconsumeTimes = 3)

      public class RocketMQDelayQueueUtils implements RocketMQListener> {

      /**

      • Rocket MQ客户端
        */
        @Resource
        private RocketMQTemplate rocketMQTemplate;

        /**

      • MQ默认延迟等级
        */
        private static final long[] TIME_DELAY_LEVEL = new long[]{0L, 1000L, 5000L, 10000L,

        1. 30000L, 60000L, 120000L, 180000L, 240000L, 300000L, 360000L, 420000L,
        2. 480000L, 540000L, 600000L, 1200000L, 1800000L, 3600000L, 7200000L};

        @SneakyThrows
        @Override
        public void onMessage(DelayTable message) {
        Date endTime = message.getEndTime();
        int delayLevel = getDelayLevel(endTime);
        // 继续延迟
        if (delayLevel != 0) {

        1. int currentDelayCount = message.getCurrentDelayCount();
        2. currentDelayCount++;
        3. message.setCurrentDelayCount(currentDelayCount);
        4. message.setCurrentDelayLevel(delayLevel);
        5. message.setCurrentDelayMillis(TIME_DELAY_LEVEL[delayLevel]);
        6. this.sendDelayMessage(message);
        7. return;

        }

        // 执行业务
        log.info(“delay message end! start to process business…”);
        Class<? extends DelayMessageHandler> messageHandler = message.getMessageHandler();
        if (messageHandler != null) {

        1. DelayMessageHandler delayMessageHandler = messageHandler.newInstance();
        2. delayMessageHandler.handle();

        }
        }

        /**

      • 延迟消息体
        *
      • @param 消息类型
        */
        @Data

        public static class DelayTable implements Serializable {

        private static final long serialVersionUID = 2405172041950251807L;

        /**

        • 延迟消息体
          */
          private E content;

          /**

        • 消息延迟结束时间
          */
          private Date endTime;

          /**

        • 总延迟毫秒数
          */
          private long totalDelayTime;

          /**

        • 总延迟时间单位
          */
          private TimeUnit totalDelayTimeUnit;

          /**

        • 当前延迟次数
          */
          private int currentDelayCount;

          /**

        • 当前延迟等级
          */
          private int currentDelayLevel;

          /**

        • 当前延迟毫秒数
          */
          private long currentDelayMillis;

          /**

        • 延迟处理逻辑
          */
          private Class<? extends DelayMessageHandler> messageHandler;
          }

        /**

      • 发送延迟消息
        *
      • @param message 消息体
      • @param delay 延迟时长
      • @param timeUnit 延迟时间单位
      • @param handler 延迟时间到了之后,需要处理的逻辑
      • @param 延迟消息类型
        */
        public void delay(E message, int delay, TimeUnit timeUnit, Class<? extends DelayMessageHandler> handler) {
        // 把延迟时间转换成时间戳(毫秒)
        long totalDelayMills = timeUnit.toMillis(delay);

        // 根据延迟时间计算结束时间
        Calendar instance = Calendar.getInstance();
        instance.add(Calendar.MILLISECOND, (int)totalDelayMills);
        Date endTime = instance.getTime();

        // 根据延迟时间匹配延迟等级(delay level)
        int delayLevel = getDelayLevel(endTime);
        long delayMillis = TIME_DELAY_LEVEL[delayLevel];

        // 发送消息
        DelayTable delayTable = new DelayTable<>();
        // 全局数据
        delayTable.setContent(message);
        delayTable.setMessageHandler(handler);
        delayTable.setEndTime(endTime);
        delayTable.setTotalDelayTime(delay);
        delayTable.setTotalDelayTimeUnit(timeUnit);

        // 当前延迟等级数据
        delayTable.setCurrentDelayCount(1);
        delayTable.setCurrentDelayLevel(delayLevel);
        delayTable.setCurrentDelayMillis(delayMillis);
        this.sendDelayMessage(delayTable);
        }

        /**

      • 计算延迟等级
        *
      • @param targetTime 延迟截止时间
      • @return Rocket MQ延迟消息等级
        */
        private static int getDelayLevel(Date targetTime) {
        long currentTime = System.currentTimeMillis();
        long delayMillis = targetTime.getTime() - currentTime;

        if (delayMillis <= 0) {

        1. // 不延迟,即延迟等级为 0
        2. return 0;

        }

        // 判断处于哪个延迟等级
        // 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h
        for (int i = 1; i <= 18; i++) {

        1. long delayLevelTime = TIME_DELAY_LEVEL[i];
        2. if (delayMillis < delayLevelTime) {
        3. return i - 1;
        4. } else if (delayMillis == delayLevelTime) {
        5. return i;
        6. }

        }

        // 最大延迟等级为 18
        return 18;
        }

        /**

      • 发送延迟消息
        *
      • @param delayTable 延迟对象,可以循环使用
        */
        @SneakyThrows
        private void sendDelayMessage(DelayTable delayTable) {
        // 消息序列化
        Message> message = MessageBuilder

        1. .withPayload(delayTable)
        2. .build();

        // 设置\发送延迟消息
        int delayLevel = delayTable.getCurrentDelayLevel();
        rocketMQTemplate.syncSend(MQConstants.CRM_DELAY_QUEUE_TOPIC, message

        1. , 3000, delayLevel);

        log.debug(“delay count: {}, delay level: {}, time: {} milliseconds”,

        1. delayTable.currentDelayCount, delayLevel, TIME_DELAY_LEVEL[delayLevel]);

        }

        /**

      • 延迟回调接口
        *
      • 回调逻辑必须实现该接口#hander()方法,在延迟结束后,会通过反射的方式调用该方法
        */
        public interface DelayMessageHandler extends Serializable {
        long serialVersionUID = 2405172041950251807L;

        /**

        • 回调函数
          */
          void handle();
          }
      • }

        测试代码:

        • /*

          • Copyright (c) 2020-2030 Sishun.Co.Ltd. All Rights Reserved.
            */
            package com.vevor.bmp.crm.io.controller;

          import com.vevor.bmp.crm.cpm.utils.RocketMQDelayQueueUtils;
          import com.vevor.common.pojo.vo.ResponseResult;
          import lombok.Data;
          import lombok.SneakyThrows;
          import lombok.extern.slf4j.Slf4j;
          import org.redisson.api.RBlockingQueue;
          import org.redisson.api.RedissonClient;
          import org.springframework.web.bind.annotation.GetMapping;
          import org.springframework.web.bind.annotation.RequestParam;
          import org.springframework.web.bind.annotation.RestController;

          import javax.annotation.Resource;
          import java.util.concurrent.TimeUnit;

          /**

          • @version :1.8.0
          • @description :延迟队列测试
          • @program :user-growth
          • @date :Created in 2023/5/22 4:54 下午
          • @since :1.8.0
            */
            @Slf4j
            @RestController
            public class DelayQueueController {

            @Resource
            private RocketMQDelayQueueUtils rocketMQDelayQueueUtils;

            @GetMapping(“/mq/delay”)
            @SneakyThrows
            public ResponseResult mqDelay(@RequestParam Integer delay, @RequestParam String task) {

            1. // 获取延时队列
            2. rocketMQDelayQueueUtils.delay(task, delay, TimeUnit.SECONDS, CallBack.class);
            3. return ResponseResult.success();

            }

            /**

            • @version :
            • @description :
            • @program :user-growth
            • @date :Created in 2023/5/23 2:11 下午
            • @since :
              */
              @Data
              public static class CallBack implements RocketMQDelayQueueUtils.DelayMessageHandler {

              /**

              • 回调函数
                */
                @Override
                public void handle() {
                log.info(“i am business logical! {}”, System.currentTimeMillis());
                }
                }
                }

        优缺点:

        • 优点: 与定时任务框架相比, 通过延迟消息的方式具实时性高、 支持分布式、轻量级、高并发等优点.
        • 缺点: 消息的准确性不可靠, 正常情况下准确性在秒级, 但是当MQ服务出现消息堆积时, 消息的时间就会偏差较大, 所以准确性依赖MQ服务的稳定.

发表评论

表情:
评论列表 (有 0 条评论,158人围观)

还没有评论,来说两句吧...

相关阅读

    相关 消息队列mq总结

    、消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有A...

    相关 MQ消息队列学习

    1.概念:(message queue) 一种跨进程的通信机制,用于在上下游之间传递消息。消息发送上游只需要依赖MQ,逻辑上和物理上都不用依赖其他服务 详解参考:ht

    相关 消息队列MQ

    消息队列MQ 之前一直也没有学习过这方面的知识,直到项目中用到才决定学习一下 [https://www.jianshu.com/p/36a7775b04ec][htt