【开发篇】二十、SpringBoot整合RocketMQ

港控/mmm° 2023-10-16 18:11 83阅读 0赞

文章目录

  • 1、整合
  • 2、消息的生产
  • 3、消费
  • 4、发送异步消息
  • 5、补充:安装RocketMQ

在这里插入图片描述

1、整合

首先导入起步依赖,RocketMQ的starter不是Spring维护的,这一点从starter的命名可以看出来(不是spring-boot-starter-xxx,而是xxx-spring-boot-starter,和MyBatisPlus、Druid一样),因此version值得自己加:

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.2.1</version>
  5. </dependency>

添加相关配置:

  1. rocketmq:
  2. name-server: localhost:9876
  3. producer:
  4. group: group_rocketmq # 设置一个自定义的生产者默认组名,省掉这个启动会报错

在需要的地方注入RocketMQTemplate操作对象:

  1. @Autowired
  2. private RocketMQTemplate rocketMQTemplate;

2、消息的生产

发送消息继续convertAndSend方法,接着上篇在Service层来演示:

  1. @Service
  2. @Slf4j
  3. public class MessageServiceRocketmqImpl implements MessageService {
  4. @Autowired
  5. private RocketMQTemplate rocketMQTemplate;
  6. @Override
  7. public void sendMessage(String id) {
  8. rocketMQTemplate.convertAndSend("order_sm_id",id);
  9. log.info("使用Rabbitmq将待发送短信的订单纳入处理队列,id:"+id);
  10. }
  11. }

convertAndSend方法依旧重载,可以直接传一个Object,也可以先传一个destination参数,即发到哪儿,再传要发的message

3、消费

这里不演示手动receive方法拿消息,直接用监听器自动拿来消费:实现RocketMQListener接口,泛型为Message类型,重写onMessage方法,加@RocketMQMessageListener注解,两个属性为主题名称和消费者组

  1. @Component
  2. @Slf4j
  3. @RocketMQMessageListener(topic="order_sm_id",consumerGroup = "group_rocketmq")
  4. public class RocketmqMessageListener implements RocketMQListener<String> {
  5. @Override
  6. public void onMessage(String id) {
  7. log.info("已完成短信发送业务,id:"+id);
  8. }
  9. }

4、发送异步消息

  1. @Service
  2. @Slf4j
  3. public class MessageServiceRocketmqImpl implements MessageService {
  4. @Autowired
  5. private RocketMQTemplate rocketMQTemplate;
  6. @Override
  7. public void sendMessage(String id) {
  8. //回调逻辑
  9. SendCallback callback = new SendCallback() {
  10. @Override
  11. public void onSuccess(SendResult sendResult) {
  12. //消息发送成功后你要做的业务
  13. //...
  14. log.info("消息发送成功");
  15. }
  16. @Override
  17. public void onException(Throwable throwable) {
  18. log.info("消息发送失败!!!!!!!!!!!");
  19. }
  20. };
  21. //异步发送
  22. rocketMQTemplate.asyncSend("order_sm_id",id,callback);
  23. log.info("使用Rabbitmq将待发送短信的订单纳入处理队列,id:"+id);
  24. }
  25. }

asyncSend异步发消息,有个参数是callback回调方法,类型是一个接口,创建这个对象的时候重写onSuccess和OnException方法,即消息发送成功以后的逻辑和消息发送失败以后的逻辑(异步的体现,不用等,来个回调)。

5、补充:安装RocketMQ

建议以Docker方式启动,下面备份下在Windows的安装(安装为一个系统服务):

  • 下载

    下载地址:https://rocketmq.apache.org/

  • 安装:解压缩即可

    默认服务端口:9876

  • 环境变量配置

    ROCKETMQ_HOME
    PATH
    NAMESRV_ADDR (建议): 127.0.0.1:9876

  • 启动命名服务器:

    mqnamesrv

  • 启动Broker

    mqbroker

  • 服务器功能测试:生产数据

    tools org.apache.rocketmq.example.quickstart.Producer

  • 服务器功能测试:消费数据

    tools org.apache.rocketmq.example.quickstart.Consumer

发表评论

表情:
评论列表 (有 0 条评论,83人围观)

还没有评论,来说两句吧...

相关阅读