【RocketMQ】SpringBoot集成RocketMQ

淡淡的烟草味﹌ 2024-03-24 18:34 196阅读 0赞

SpringBoot集成RocketMQ
首先依旧是引入依赖

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.2.2</version>
  5. </dependency>

然后就可以编写发送不同类型消息的代码了

  1. package blossom.project.springbootkp.seckillproducer;
  2. import blossom.project.springbootkp.seckillproducer.entity.MsgModel;
  3. import com.alibaba.fastjson.JSONObject;
  4. import org.apache.rocketmq.client.producer.SendCallback;
  5. import org.apache.rocketmq.client.producer.SendResult;
  6. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  7. import org.apache.rocketmq.spring.support.RocketMQHeaders;
  8. import org.junit.jupiter.api.Test;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.boot.test.context.SpringBootTest;
  11. import org.springframework.messaging.Message;
  12. import org.springframework.messaging.support.MessageBuilder;
  13. import java.util.Arrays;
  14. import java.util.List;
  15. @SpringBootTest
  16. class SecKillProducerApplicationTests {
  17. private List<MsgModel> msgModels = Arrays.asList(
  18. new MsgModel("qwer", 1L, "下单"),
  19. new MsgModel("qwer", 1L, "短信"),
  20. new MsgModel("qwer", 1L, "物流"),
  21. new MsgModel("zxcv", 2L, "下单"),
  22. new MsgModel("zxcv", 2L, "短信"),
  23. new MsgModel("zxcv", 2L, "物流")
  24. );
  25. @Autowired
  26. private RocketMQTemplate rocketMQTemplate;
  27. @Test
  28. void syncProducer() {
  29. rocketMQTemplate.syncSend("bootTestTopic","使用springboot集成rocketmq");
  30. }
  31. @Test
  32. void asyncProducer(){
  33. rocketMQTemplate.asyncSend("bootTestTopic", "发送一条异步消息", new SendCallback() {
  34. @Override
  35. public void onSuccess(SendResult sendResult) {
  36. System.out.println("发送成功");
  37. }
  38. @Override
  39. public void onException(Throwable throwable) {
  40. System.out.println("发送失败"+throwable.getMessage());
  41. }
  42. });
  43. }
  44. @Test
  45. void oneWayProducer(){
  46. rocketMQTemplate.sendOneWay("bootTestTopic","发送一个单向消息");
  47. }
  48. @Test
  49. void delayProducer(){
  50. Message<String> message = MessageBuilder.withPayload("这是一条延迟消息").build();
  51. rocketMQTemplate.syncSend("bootTestTopic",message,3000,2);
  52. }
  53. @Test
  54. void orderedProducer(){
  55. msgModels.forEach(x->{
  56. String s = JSONObject.toJSONString(x);
  57. rocketMQTemplate.syncSendOrderly("orderlyTopic", s,x.getOrderSn());
  58. });
  59. }
  60. @Test
  61. void tagProducer(){
  62. rocketMQTemplate.syncSend("bootTestTopic:tagA","我是一个带标签的消息");
  63. }
  64. @Test
  65. void keyProducer(){
  66. Message<String> message = MessageBuilder.withPayload("我是一个带有key的消息").setHeader(RocketMQHeaders.KEYS, "testKey")
  67. .build();
  68. rocketMQTemplate.syncSend("bootTestTopic",message);
  69. }
  70. }
  71. 对于不同的消息类型,我们可以使用不同的方式去接收。
  72. 创建一个顺序消息的监听器
  73. @Component
  74. @RocketMQMessageListener(
  75. topic = "orderlyTopic",
  76. consumerGroup = "boot-orderly-consumer-group",
  77. consumeMode = ConsumeMode.ORDERLY, //顺序消费模式 单线程
  78. maxReconsumeTimes = 5) //最大重试次数
  79. public class OrderlyMessageListener implements RocketMQListener<MessageExt> {
  80. @Override
  81. public void onMessage(MessageExt messageExt) {
  82. MsgModel msgModel = JSON.parseObject(new String(messageExt.getBody()), MsgModel.class);
  83. System.out.println(msgModel);
  84. }
  85. }
  86. 普通的创建一个监听器
  87. @Component
  88. @RocketMQMessageListener(topic = "bootTestTopic",consumerGroup = "boot-consumuer-group")
  89. public class SimpleMessageListener implements RocketMQListener<MessageExt> {
  90. /**
  91. * 这个方法就是消费者方法
  92. * 这里的String就是消息内容
  93. * 这里的泛型就是这里的参数类型
  94. * 如果泛型指定了固定的类型 那么消息体就是我们的参数
  95. * 如果我们的类型设定为具体的类型 那么我们只能拿到消息体
  96. * 而如果我们把消息类型设定为MessageExt类型,那么我们可以拿到消息头
  97. * ------------------------------------------------
  98. * 只要这个方法不报错 就会直接完成消息的接收 而如果报错了 就会重试
  99. * @param msg
  100. */
  101. @Override
  102. public void onMessage(MessageExt msg) {
  103. String keys = msg.getKeys();
  104. System.out.println("接收到的keys为"+keys);
  105. String body = new String(msg.getBody());
  106. System.out.println("接收到的消息体为"+body);
  107. }
  108. }
  109. 创建一个识别tag标签的监听器
  110. @RocketMQMessageListener(topic = "bootTestTopic",
  111. consumerGroup = "boot-tag-consumer-group",
  112. selectorType = SelectorType.TAG, //tag过滤模式
  113. selectorExpression = "tagA || tagB") //tag标签匹配模式
  114. public class TagMessageListener implements RocketMQListener<MessageExt> {
  115. @Override
  116. public void onMessage(MessageExt msg) {
  117. System.out.println(new String(msg.getBody()));
  118. }
  119. }

发表评论

表情:
评论列表 (有 0 条评论,196人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RocketMQ集成SpringBoot

    前言 项目运行前需安装好RocketMQ与环境变量配置,对Rocket相关知识不了解的可按照以下文章顺序后再阅读该文: [RocketMQ安装配置][Rocket