Springboot整合一之Springboot整合RabbitMQ

港控/mmm° 2022-05-24 10:55 427阅读 0赞

前言

目前,springboot已然成为了最热的java开发整合框架,主要是因其简单的配置,并且本身提供了很多与第三方框架的整合,甚至可以让我们在短短的几分钟里就可以搭建一个完整的项目架构。所以,博主打算近期写一些springboot整合案例,也不知道先写哪个,那就从最近的写起吧, 言归正传。。。

今天博主要说的是Springboot整合RabbitMQ,那么,Let’s start…

1、新建一个springboot工程(博主这里使用的是IDEA)

70

关于具体怎么创建一个springboot工程,博主这里就不多做说明了(很easy的事情)。

2、添加依赖

这个大家再熟悉不过了,也很简单。

  1. <!-- rabbitmq依赖 -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

3、在application.yml文件中进行RabbitMQ的相关配置

先上代码

  1. spring:
  2. rabbitmq:
  3. host: 116.85.33.91
  4. port: 5672
  5. username: guest
  6. password: password
  7. publisher-confirms: true # 消息发送到交换机确认机制,是否确认回调
  8. server:
  9. port: 8080

关于上面的几行配置相信用过springboot的同学都明白,这里主要做以下说明:

首先,确保你的RabbitMQ已经安装好,并且已经启动,在你的本地确实可以连接的上(关于RabbitMQ的安装和相关配置博主在这里不做多说,要不未免有些偏题,后续博主会专门出一篇文章详解的吐舌头)。

RabbitMQ的安装和相关配置 https://blog.csdn.net/zhuzhezhuzhe1/article/details/80464291

接下来,就可以开始撸代码了。。。先上一张图

70 1

4、进行相关配置

大家可以看到上图中的config包,这里就是相关配置类

70 2

  1. 这里博主一共写了三个配置类,其实我们完全可以把这三个写在一个配置类中,不过在这里博主为了便于大家理解,特地写了三个,在实际开发中,如果允许的话,博主也是建议写成三个(个人建议)![吐舌头][tongue.gif]。

下面,就这三个配置类,做下说明:(这里需要大家对RabbitMQ有一定的了解,知道生产者、消费者、消息交换机、队列等)

ExchangeConfig 消息交换机配置

  1. package com.space.rabbitmq.config;
  2. import org.springframework.amqp.core.DirectExchange;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /**
  6. * 消息交换机配置 可以配置多个
  7. * @author zhuzhe
  8. * @date 2018/5/25 15:40
  9. * @email 1529949535@qq.com
  10. */
  11. @Configuration
  12. public class ExchangeConfig {
  13. /**
  14. * 1.定义direct exchange,绑定queueTest
  15. * 2.durable="true" rabbitmq重启的时候不需要创建新的交换机
  16. * 3.direct交换器相对来说比较简单,匹配规则为:如果路由键匹配,消息就被投送到相关的队列
  17. * fanout交换器中没有路由键的概念,他会把消息发送到所有绑定在此交换器上面的队列中。
  18. * topic交换器你采用模糊匹配路由键的原则进行转发消息到队列中
  19. * key: queue在该direct-exchange中的key值,当消息发送给direct-exchange中指定key为设置值时,
  20. * 消息将会转发给queue参数指定的消息队列
  21. */
  22. @Bean
  23. public DirectExchange directExchange(){
  24. DirectExchange directExchange = new DirectExchange(RabbitMqConfig.EXCHANGE,true,false);
  25. return directExchange;
  26. }
  27. }
  28. QueueConfig 队列配置
  29. package com.space.rabbitmq.config;
  30. import org.springframework.amqp.core.Queue;
  31. import org.springframework.context.annotation.Bean;
  32. import org.springframework.context.annotation.Configuration;
  33. /**
  34. * 队列配置 可以配置多个队列
  35. * @author zhuzhe
  36. * @date 2018/5/25 13:25
  37. * @email 1529949535@qq.com
  38. */
  39. @Configuration
  40. public class QueueConfig {
  41. @Bean
  42. public Queue firstQueue() {
  43. /**
  44. durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
  45. auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
  46. exclusive 表示该消息队列是否只在当前connection生效,默认是false
  47. */
  48. return new Queue("first-queue",true,false,false);
  49. }
  50. @Bean
  51. public Queue secondQueue() {
  52. return new Queue("second-queue",true,false,false);
  53. }
  54. }
  55. RabbitMqConfig RabbitMq配置
  56. package com.space.rabbitmq.config;
  57. import com.space.rabbitmq.mqcallback.MsgSendConfirmCallBack;
  58. import org.springframework.amqp.core.AcknowledgeMode;
  59. import org.springframework.amqp.core.Binding;
  60. import org.springframework.amqp.core.BindingBuilder;
  61. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  62. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  63. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
  64. import org.springframework.beans.factory.annotation.Autowired;
  65. import org.springframework.context.annotation.Bean;
  66. import org.springframework.context.annotation.Configuration;
  67. /**
  68. * RabbitMq配置
  69. * @author zhuzhe
  70. * @date 2018/5/25 13:37
  71. * @email 1529949535@qq.com
  72. */
  73. @Configuration
  74. public class RabbitMqConfig {
  75. /** 消息交换机的名字*/
  76. public static final String EXCHANGE = "exchangeTest";
  77. /** 队列key1*/
  78. public static final String ROUTINGKEY1 = "queue_one_key1";
  79. /** 队列key2*/
  80. public static final String ROUTINGKEY2 = "queue_one_key2";
  81. @Autowired
  82. private QueueConfig queueConfig;
  83. @Autowired
  84. private ExchangeConfig exchangeConfig;
  85. /**
  86. * 连接工厂
  87. */
  88. @Autowired
  89. private ConnectionFactory connectionFactory;
  90. /**
  91. 将消息队列1和交换机进行绑定
  92. */
  93. @Bean
  94. public Binding binding_one() {
  95. return BindingBuilder.bind(queueConfig.firstQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTINGKEY1);
  96. }
  97. /**
  98. * 将消息队列2和交换机进行绑定
  99. */
  100. @Bean
  101. public Binding binding_two() {
  102. return BindingBuilder.bind(queueConfig.secondQueue()).to(exchangeConfig.directExchange()).with(RabbitMqConfig.ROUTINGKEY2);
  103. }
  104. /**
  105. * queue listener 观察 监听模式
  106. * 当有消息到达时会通知监听在对应的队列上的监听对象
  107. * @return
  108. */
  109. @Bean
  110. public SimpleMessageListenerContainer simpleMessageListenerContainer_one(){
  111. SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
  112. simpleMessageListenerContainer.addQueues(queueConfig.firstQueue());
  113. simpleMessageListenerContainer.setExposeListenerChannel(true);
  114. simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
  115. simpleMessageListenerContainer.setConcurrentConsumers(1);
  116. simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
  117. return simpleMessageListenerContainer;
  118. }
  119. /**
  120. * 定义rabbit template用于数据的接收和发送
  121. * @return
  122. */
  123. @Bean
  124. public RabbitTemplate rabbitTemplate() {
  125. RabbitTemplate template = new RabbitTemplate(connectionFactory);
  126. /**若使用confirm-callback或return-callback,
  127. * 必须要配置publisherConfirms或publisherReturns为true
  128. * 每个rabbitTemplate只能有一个confirm-callback和return-callback
  129. */
  130. template.setConfirmCallback(msgSendConfirmCallBack());
  131. //template.setReturnCallback(msgSendReturnCallback());
  132. /**
  133. * 使用return-callback时必须设置mandatory为true,或者在配置中设置mandatory-expression的值为true,
  134. * 可针对每次请求的消息去确定’mandatory’的boolean值,
  135. * 只能在提供’return -callback’时使用,与mandatory互斥
  136. */
  137. // template.setMandatory(true);
  138. return template;
  139. }
  140. /**
  141. * 消息确认机制
  142. * Confirms给客户端一种轻量级的方式,能够跟踪哪些消息被broker处理,
  143. * 哪些可能因为broker宕掉或者网络失败的情况而重新发布。
  144. * 确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用)
  145. * 在channel为事务时,不可引入确认模式;同样channel为确认模式下,不可使用事务。
  146. * @return
  147. */
  148. @Bean
  149. public MsgSendConfirmCallBack msgSendConfirmCallBack(){
  150. return new MsgSendConfirmCallBack();
  151. }
  152. }

消息回调

70 3

  1. package com.space.rabbitmq.mqcallback;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.amqp.rabbit.support.CorrelationData;
  4. /**
  5. * 消息发送到交换机确认机制
  6. * @author zhuzhe
  7. * @date 2018/5/25 15:53
  8. * @email 1529949535@qq.com
  9. */
  10. public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
  11. @Override
  12. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  13. System.out.println("MsgSendConfirmCallBack , 回调id:" + correlationData);
  14. if (ack) {
  15. System.out.println("消息消费成功");
  16. } else {
  17. System.out.println("消息消费失败:" + cause+"\n重新发送");
  18. }
  19. }
  20. }

5、生产者/消息发送者

70 4

  1. package com.space.rabbitmq.sender;
  2. import com.space.rabbitmq.config.RabbitMqConfig;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.amqp.rabbit.support.CorrelationData;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Component;
  8. import java.util.UUID;
  9. /**
  10. * 消息发送 生产者1
  11. * @author zhuzhe
  12. * @date 2018/5/25 14:28
  13. * @email 1529949535@qq.com
  14. */
  15. @Slf4j
  16. @Component
  17. public class FirstSender {
  18. @Autowired
  19. private RabbitTemplate rabbitTemplate;
  20. /**
  21. * 发送消息
  22. * @param uuid
  23. * @param message 消息
  24. */
  25. public void send(String uuid,Object message) {
  26. CorrelationData correlationId = new CorrelationData(uuid);
  27. rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTINGKEY2,
  28. message, correlationId);
  29. }
  30. }

6、消费者

70 5

  1. package com.space.rabbitmq.receiver;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. /**
  5. * 消息消费者1
  6. * @author zhuzhe
  7. * @date 2018/5/25 17:32
  8. * @email 1529949535@qq.com
  9. */
  10. @Component
  11. public class FirstConsumer {
  12. @RabbitListener(queues = {"first-queue","second-queue"}, containerFactory = "rabbitListenerContainerFactory")
  13. public void handleMessage(String message) throws Exception {
  14. // 处理消息
  15. System.out.println("FirstConsumer {} handleMessage :"+message);
  16. }
  17. }

7、测试

70 6

  1. package com.space.rabbitmq.controller;
  2. import com.space.rabbitmq.sender.FirstSender;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. import java.util.UUID;
  7. /**
  8. * @author zhuzhe
  9. * @date 2018/5/25 16:00
  10. * @email 1529949535@qq.com
  11. */
  12. @RestController
  13. public class SendController {
  14. @Autowired
  15. private FirstSender firstSender;
  16. @GetMapping("/send")
  17. public String send(String message){
  18. String uuid = UUID.randomUUID().toString();
  19. firstSender.send(uuid,message);
  20. return uuid;
  21. }
  22. }

此时,我们就可以启动项目,进行测试了

70 7

70 8

当我们访问send接口时,看到控制台打印如下信息,证明我们的消息已经发送,并且消费者也已经成功接收!

源码地址:https://github.com/zhuzhegithub/springboot-rabbitmq

转载请务必保留此出处(原作者):https://blog.csdn.net/zhuzhezhuzhe1

版权声明:本文为原创文章,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。

https://blog.csdn.net/zhuzhezhuzhe1

发表评论

表情:
评论列表 (有 0 条评论,427人围观)

还没有评论,来说两句吧...

相关阅读

    相关 SpringBoot整合RabbitMQ

    一 RabbitMQ的介绍     RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apac