RocketMQ:延时消息

曾经终败给现在 2023-06-26 10:59 132阅读 0赞
  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-client</artifactId>
  4. <version>4.4.0</version>
  5. </dependency>

生产者

  1. public class Producer {
  2. public static void main(String[] args) throws Exception {
  3. // 1.创建消息生产者producer,并制定生产者组名
  4. DefaultMQProducer producer = new DefaultMQProducer("group1");
  5. // 2.指定NameServer地址
  6. producer.setNamesrvAddr("localhost:9876");
  7. // 3.启动producer
  8. producer.start();
  9. for (int i = 0; i < 10; i++) {
  10. // 4.创建消息对象,指定主题Topic、Tag和消息体
  11. /*
  12. 参数1:消息主题Topic
  13. 参数2:消息Tag
  14. 参数3:消息内容
  15. */
  16. Message msg = new Message("DelayTopic","tag1",("hello world"+i).getBytes());
  17. // 设置延迟时间 RocketMq并不支持任意时间的延时,需要试着几个固定的延时等级
  18. // 从1s到2h分别对应着等级1到18 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
  19. msg.setDelayTimeLevel(2);
  20. // 5.发送消息结果包含 发送状态 消息id 消息接收队列id等
  21. SendResult result = producer.send(msg);
  22. System.out.println("发送结果"+result);
  23. // 线程睡眠1秒
  24. TimeUnit.SECONDS.sleep(1);
  25. }
  26. // 6关闭生产者producer
  27. producer.shutdown();
  28. }
  29. }

消费者

  1. public class Consumer {
  2. public static void main(String[] args) throws MQClientException {
  3. // 1.创建消费者Consumer,制定消费者组名
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
  5. // 2.指定Nameserver地址
  6. consumer.setNamesrvAddr("localhost:9876");
  7. // 3.订阅主题Topic和Tag
  8. consumer.subscribe("DelayTopic","tag1");
  9. // 消费模式:默认是负载均衡模式,还有一种是广播模式
  10. // consumer.setMessageModel(MessageModel.BROADCASTING);
  11. // 4.设置回调函数,处理消息
  12. consumer.registerMessageListener(new MessageListenerConcurrently() {
  13. //接收消息内容
  14. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  15. for (MessageExt messageExt : list) {
  16. System.out.println("消息id:【"+messageExt.getMsgId()+"】,延时时间:"+(System.currentTimeMillis()-messageExt.getStoreTimestamp()));
  17. }
  18. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  19. }
  20. });
  21. // 5.启动消费者consumer
  22. consumer.start();
  23. System.out.println("消费者启动");
  24. }
  25. }

发表评论

表情:
评论列表 (有 0 条评论,132人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RocketMQ进阶-消息

        前言 在开发中经常会遇到延时任务的需求,例如在12306购买车票,若生成订单30分钟未支付则自动取消;还有在线商城完成订单后48小时不评价 ,自动5星好评。像