SpringBoot集成RocketMQ使用延时消息与消息过滤

我会带着你远行 2024-03-31 13:17 213阅读 0赞

1.首先了解一下什么是延时消息

延时消息用来指定消息发送到消息队列(RocketMQ)的服务端后,延时一段时间之后才被投递到客户端进行消费(例如半分钟之后),适用于解决一些消息的生产和消费有窗口弹出要求的场景。例如:电商交易中超过时间未支付则关闭订单,在订单创建时,发送一条延时消息,这条消息将在30分钟以后投递给消费者,消费者受到此消息之后,判断对应的订单是否已支付,如果支付未完成则关闭订单,删除数据,恢复库存,如果已完成支付则忽略。比如:

dc167d2478094475ad40f9b17fedda78.png

先引入依赖

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-client</artifactId>
  4. <version>4.9.0</version>
  5. </dependency>

1.创建延时消息生产者样例

  1. // 延时消息
  2. public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
  3. //创建 producer
  4. DefaultMQProducer producer = new DefaultMQProducer("tmXBL");
  5. //设置ip地址
  6. producer.setNamesrvAddr("192.168.1.7:9876");
  7. //开始加载
  8. producer.start();
  9. //编辑消息
  10. String body = "{userName:'Lix',hobby:'延时消息'}";
  11. //创建消息
  12. Message message = new Message("topicXBL","tagsXBL",body.getBytes());
  13. //设置延时消息
  14. //注意,这是设置消息的延时等级
  15. //1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
  16. //1s 5s 10s30s1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
  17. message.setDelayTimeLevel(6);
  18. //发送消息
  19. SendResult send = producer.send(message);
  20. System.out.println(send);
  21. //关闭链接
  22. producer.shutdown();
  23. }

说明:现在往rocketMQ中发送一条延时消息(2分钟后roketMQ才会接收到)

2.创建延时消息消费者样例

  1. //消费延时消息
  2. public static void main(String[] args) throws MQClientException {
  3. //创建消费对象
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_XBL");
  5. //通过NameSrv设置ip+端口
  6. consumer.setNamesrvAddr("192.168.1.7:9876");
  7. //设置要消费信息的范围
  8. consumer.subscribe("topicXBL","tagsXBL");
  9. //设置消息监听
  10. consumer.registerMessageListener(new MessageListenerConcurrently() {
  11. @Override
  12. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  13. //setDelayLevelWhenNextConsume设置消费间隔
  14. consumeConcurrentlyContext.setDelayLevelWhenNextConsume(3);
  15. //获取每条消息的延时时间
  16. for (MessageExt messageExt : list) {
  17. byte[] body = messageExt.getBody();
  18. String str = new String(body);
  19. System.out.println(str);
  20. long storeTimestamp = messageExt.getStoreTimestamp();
  21. System.out.println("存储时间为:"+storeTimestamp);
  22. }
  23. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  24. }
  25. });
  26. //直接消费
  27. consumer.start();
  28. // //消费者需要关闭吗? 不用
  29. // //1、我们在项目中使用消息消费者,是需要持续不断的读取MQ中的消息,
  30. // //所以不用关闭
  31. // //2、在读取消息时,是自动开启另一线程,和当前代码不是同时执行
  32. // //可能造成,上面的consumeMessage方法还没走完,却已经consumer.shutdown了
  33. // //consumer.shutdown();
  34. }

3.消息过滤

注意,在我们需要使用过滤时,需要提前设置broker的允许过滤,否则将会报错:Exception in thread “main” org.apache.rocketmq.client.exception.MQClientException: CODE: 1 DESC: The broker does not support consumer to filter message by SQL92

怎么设置?

进入rocketmq的conf文件夹中,编辑broker.conf

添加一行:

enablePropertyFilter = true

保存退出即可

然后关闭RocketMQ

返回目录后使用

nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &

ba8e49e967f24fdd9b54dcf37adb1539.png

ee25013baf3945af91c088195a1c2936.png

4. 准备一条消息后续过滤

  1. //RocketMQ的消息过滤
  2. public static void main(String[] args) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
  3. //我们使用RocketMQ消费消息时,大多数情况是会使用到Tags的,通过Tags我们可以过滤掉我们想要的数据
  4. //列如:consumer.subscribe("topicTmXBL","TagsTmXBL||TagsTmXBL2");
  5. //这种情况是消费者直接找标签TagsXBL或者TagsXBL2的数据,但是限制是一个消息只能有一个标签
  6. //对于复杂场景来说,可能就不够用了,我们可以使用SQL表达式来筛选信息,SQL特性可以通过
  7. //发送消息的属性来进行计算,使用RocketMQ可以实现一些简单的逻辑
  8. //例如 AND 、BETWEEN 、 >= 、 <= 、 <> 、 OR 、 IS 、 NULL 等等
  9. //首先需要知道:我们想要使用的SQL特性,不是说直接过滤Tags, 因为一个标签就是一个单词而已
  10. //一个标签对应一个业务, 我们可以在消息生成者的代码中,额外加一些属性,就可以完成过滤了
  11. //前两步不变
  12. DefaultMQProducer producer = new DefaultMQProducer("tm_XBL");
  13. producer.setNamesrvAddr("192.168.1.7:9876");
  14. //需要将producer对象启动起来
  15. producer.start();
  16. //编辑信息
  17. String body="userName:'Tom',hobby:'Jerry'";
  18. //创建消息
  19. Message message=new Message("topicTmXBL","tagsTmXBL","keysTmXBL",body.getBytes());
  20. //加上条件属性
  21. message.putUserProperty("isMember","0");
  22. message.putUserProperty("MemberLever","9");
  23. //发送信息
  24. producer.send(message);
  25. //关闭链接
  26. producer.shutdown();
  27. }

说明:这条消息跟其他消息不同点就我们使用 message.putUserProperty()设置2条属性,为后续过滤产生了条件

5.执行消费者过滤

  1. //消费消息时进行过滤
  2. public static void main(String[] args) throws MQClientException {
  3. //创建消费者对象
  4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tmConsumerXBL233");
  5. //通过NameSrv设置IP+端口
  6. consumer.setNamesrvAddr("192.168.1.7:9876");
  7. //消费消息时进行消息过滤,需要注意,如果一个消息被过滤掉了
  8. //(实际上已经消费了,但是不符合条件没有筛选出来),则当前消费组也在后续的消费中,不会继续消费已经过滤掉的消息。
  9. consumer.subscribe("topicTmXBL", MessageSelector.bySql("isMember <= 0 AND MemberLever >= 5 "));
  10. //设置消息监听
  11. consumer.registerMessageListener(new MessageListenerOrderly() {
  12. @Override
  13. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
  14. //list就是根据条件得到的数据
  15. //consumeOrderlyContext就是设置各种属性的上下文对象
  16. //是否设置消费的消息在 被消费后被标记为以消费,相当于是删除的意思
  17. consumeOrderlyContext.setAutoCommit(true);
  18. list.forEach(a->{
  19. byte[] body = a.getBody();
  20. String str = new String(body);
  21. System.out.println(str);
  22. });
  23. return ConsumeOrderlyStatus.SUCCESS;
  24. }
  25. });
  26. consumer.start();
  27. }

说明:现在使用MessageSelector.bySql(定义的sql条件)输出结果会显示一条数据,如果更改条件使得条件不成立则消息会被过滤,且当前消费组无法重复消费。

总结:

本文主要讲解rocketMQ在java中如何使用延时消息与消息过滤,使用起来比较简单多多支持点赞哈

发表评论

表情:
评论列表 (有 0 条评论,213人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RocketMQ进阶-消息

        前言 在开发中经常会遇到延时任务的需求,例如在12306购买车票,若生成订单30分钟未支付则自动取消;还有在线商城完成订单后48小时不评价 ,自动5星好评。像