RocketMQ延时消息使用

╰半夏微凉° 2024-03-22 18:24 175阅读 0赞

RocketMQ延时消息使用

生产者实例

  1. @Component
  2. public class DelayProducer {
  3. @Value("${rocketmq.producer.group}")
  4. private String producerGroup; // 生产者分组
  5. @Value("${rocketmq.name-server}")
  6. private String nameServer; // 服务地址
  7. @Value("${rocketmq.consumer.topic}")
  8. private String topic; // 生产者主题
  9. // RocketMQ(延时消息)生产者
  10. /*
  11. * tag:
  12. * OrderAutoCancel 订单自动取消
  13. * OrderAutoFinish 订单自动完成
  14. * OrderAutoReceive 订单自动收货
  15. * SendExpireWarn 发货超期提醒
  16. * WillPayExpire 将要超期付款提醒
  17. * PayExpire 超期付款提醒
  18. * delayTimeLevel:
  19. * 2 24小时
  20. * 3 7天
  21. * 4 15天
  22. * 5 57天
  23. * 6 60天
  24. * */
  25. //发送延时消息
  26. public void sendMsg(String tag,Integer parameter,Object object) throws Exception {
  27. // 实例化消息生产者Producer
  28. DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
  29. // 设置NameServer的地址
  30. producer.setNamesrvAddr(nameServer);
  31. // 启动Producer实例
  32. producer.start();
  33. // RockerMQUtil.TOPIC, RockerMQUtil.TAGS,null, JSON.toJSONString(orderJc).getBytes()
  34. // 参数一是 topic主题,消费者通过匹配topic进行消费,tags对应的是标签,一个topic有多个tags,keys是标识,body对应的是实体
  35. Message msg = new Message(topic
  36. , tag,null, JSON.toJSONString(object).getBytes());
  37. //"5s 1d 7d 15d 57d 60d"
  38. //设置消息延迟级别为6,也就是延迟60天。
  39. Integer delayTimeLevel = null;
  40. if(parameter == 24){
  41. // 如果延时时间为24小时,那设置他的等级为2
  42. delayTimeLevel = 2;
  43. }else if(parameter == 7){
  44. // 如果延时时间为7天,那设置他的等级为3
  45. delayTimeLevel = 3;
  46. }else if(parameter == 15){
  47. // 如果延时时间为15天,那设置他的等级为4
  48. delayTimeLevel = 4;
  49. }else if(parameter == 57){
  50. // 如果延时时间为57天,那设置他的等级为5
  51. delayTimeLevel = 5;
  52. }else if(parameter == 60){
  53. // 如果延时时间为60天,那设置他的等级为6
  54. delayTimeLevel = 6;
  55. }else if(parameter ==30){
  56. delayTimeLevel = 7;
  57. }else{
  58. delayTimeLevel = 1; // 如果都不是,那设置延时时间为5秒,等级为1
  59. }
  60. msg.setDelayTimeLevel(delayTimeLevel);
  61. // 发送消息到一个Broker
  62. producer.send(msg);
  63. // 如果不再发送消息,关闭Producer实例。
  64. producer.shutdown();
  65. }
  66. }

消费者实例

  1. @Component
  2. @RocketMQMessageListener(topic = RockerMQUtil.TOPIC, consumerGroup = RockerMQUtil.CONSUMERGROUP,
  3. messageModel = MessageModel.BROADCASTING)// 使用广播模式进行匹配
  4. public class Consumer implements RocketMQListener<MessageExt> {
  5. @Resource
  6. private IOrderService orderService;
  7. @Override
  8. public void onMessage(MessageExt message) {
  9. SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
  10. // 实例化消费者
  11. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RockerMQUtil.CONSUMERGROUP);
  12. // 设置NameServer的地址
  13. consumer.setNamesrvAddr(RockerMQUtil.NAMESRVADDR);
  14. // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
  15. try {
  16. consumer.subscribe(RockerMQUtil.TOPIC, "*");
  17. } catch (MQClientException e) {
  18. e.printStackTrace();
  19. }
  20. // 输出消息被消费的时间
  21. System.out.printf("%s %s Receive New Messages:%n"
  22. , sdf.format(new Date())
  23. , Thread.currentThread().getName());
  24. // 注册回调实现类来处理从broker拉取回来的消息
  25. String body=new String(message.getBody());
  26. OrderUtil entity = JSON.parseObject(body, OrderUtil.class);
  27. Order order = orderService.getOne(new QueryWrapper<Order>().eq("id",entity.getId())
  28. .select("status","id"));
  29. if(order.getStatus() == 5){
  30. // 在线付款订单超时未付款,取消订单
  31. orderService.update(new UpdateWrapper<Order>().eq("id",entity.getId())
  32. .set("status",7));
  33. System.out.println("订单取消成功");
  34. }
  35. System.out.printf("\tMsg Id: %s%n", message.getMsgId());
  36. System.out.printf("\tBody: %s%n", new String(message.getBody()));
  37. // 启动消费者实例
  38. try {
  39. consumer.start();
  40. } catch (MQClientException e) {
  41. e.printStackTrace();
  42. }
  43. System.out.println("Consumer Started.");
  44. }
  45. }

自定义延时时间

在服务器端(rocketmq-broker端)的属性配置文件broker.conf中加入以下行:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
描述了各级别与延时时间的对应映射关系。

  1. 这个配置项配置了从1级开始,各级延时的时间,可以修改这个指定级别的延时时间;
  2. 时间单位支持:s、m、h、d,分别表示秒、分、时、天;
  3. 默认值就是上面声明的,可手工调整;
  4. 默认值已够用,不建议修改这个值。

发表评论

表情:
评论列表 (有 0 条评论,175人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RocketMQ进阶-消息

        前言 在开发中经常会遇到延时任务的需求,例如在12306购买车票,若生成订单30分钟未支付则自动取消;还有在线商城完成订单后48小时不评价 ,自动5星好评。像