springboot simple (12) springboot RabbitMQ

怼烎@ 2024-03-30 13:12 187阅读 0赞

这里首先简单的介绍了RabbitMQ ,然后实现了springboot集成RabbitMQ ,包含两个工程:
1 Producer 生产者
2 Consumer 消费者

1 RabbitMQ 简介

AMQP :Advanced Message Queue,高级消息队列协议。
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。

RabbitMQ是一个生产者和消费者模型,用于实现消息的接收、存储、转发。
模型如下图所示:
在这里插入图片描述
Producer 生产者
Consumer 消费者
Exchange 交换机
Queue :队列,存储消息

常用的交换机Exchange
1)fanout Exchange(扇形交换机)
2)direct Exchange(直连交换机)
3)topic Exchange(主题交换机)
4)headers Exchange(头交换机)

2 springboot 集成RabbitMQ

第1步:pom中引入依赖的包:

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. <version>3.0.1</version>
  5. </dependency>

第2步:application.properties设置相关参数:

  1. #rabbitmq相关配置
  2. spring.rabbitmq.host=127.0.0.1
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=guest
  5. spring.rabbitmq.password=guest
  6. spring.rabbitmq.publisher-confirm-type=correlated
  7. spring.rabbitmq.publisher-returns=true
  8. spring.main.allow-bean-definition-overriding = true

2.1 fanout Exchange(扇形交换机)

特点:将消息路由到所有与该交换机绑定的队列中。
在这里插入图片描述
第1步:创建配置类FanoutConfig,创建了:
交换机(exchange):fanout_exchange
队列(Queue):fanout_queue_A
绑定(Binding):fanoutBindA
队列(Queue):fanout_queue_B
绑定(Binding):fanoutBindB

  1. @Configuration
  2. public class FanoutConfig {
  3. @Bean("fanout_exchange")
  4. public FanoutExchange fanoutExchange() {
  5. return ExchangeBuilder.fanoutExchange("fanout_exchange").durable(true).build();
  6. }
  7. @Bean("fanout_queue_A")
  8. public Queue fanoutQueueA(){
  9. return QueueBuilder.durable("fanout_queue_A").build();
  10. }
  11. @Bean
  12. public Binding fanoutBindA(){
  13. return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());
  14. }
  15. @Bean("fanout_queue_B")
  16. public Queue fanoutQueueB(){
  17. return QueueBuilder.durable("fanout_queue_B").build();
  18. }
  19. @Bean
  20. public Binding fanoutBindB(){
  21. return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());
  22. }
  23. }

第2步:Producer 项目创建发送消息的类FanoutService :

  1. @Slf4j
  2. @Service
  3. public class FanoutService {
  4. String exchange = "fanout_exchange";
  5. @Resource
  6. private RabbitTemplate rabbitTemplate;
  7. public void sendData(String data) {
  8. log.info("------------fanout producer begin------------");
  9. rabbitTemplate.convertAndSend(exchange, "", data);
  10. log.info("exchange:"+exchange);
  11. log.info("send data:"+data);
  12. log.info("------------fanout producer end------------");
  13. }
  14. }

第3步:Consumer 创建接收消息的类FanoutListener :

  1. @Component
  2. @Slf4j
  3. public class FanoutListener {
  4. /**
  5. * .接收队列fanout_queue_A的数据
  6. */
  7. @RabbitListener(queues = {
  8. "fanout_queue_A"})
  9. @RabbitHandler
  10. public void receiveDataFromQueueA(Message message, Channel channel) throws IOException {
  11. long tag = message.getMessageProperties().getDeliveryTag();
  12. channel.basicAck( tag, true);
  13. log.info("------------fanout consumer begin------------");
  14. log.info("queue: fanout_queue_A");
  15. log.info("receive Data: " + new String(message.getBody()));
  16. log.info("------------fanout consumer end------------");
  17. }
  18. /**
  19. * .接收队列fanout_queue_B的数据
  20. */
  21. @RabbitListener(queues = {
  22. "fanout_queue_B"})
  23. @RabbitHandler
  24. public void receiveDataFromQueueB(Message message, Channel channel) throws IOException {
  25. long tag = message.getMessageProperties().getDeliveryTag();
  26. channel.basicAck( tag, true);
  27. log.info("------------fanout consumer begin------------");
  28. log.info("queue: fanout_queue_B");
  29. log.info("receive Data: " + new String(message.getBody()));
  30. log.info("------------fanout consumer end------------");
  31. }
  32. }

2.2 direct Exchange(直连交换机)

特点:根据消息默认的路由键(队列名称)将消息投递到相应的队列中。
在这里插入图片描述
第1步:创建配置类DirectConfig,创建了:
交换机(exchange):direct_exchange_A
队列(Queue):direct_queue_A
绑定(Binding):directBindA

交换机(exchange):direct_exchange_B
队列(Queue):direct_queue_B
绑定(Binding):directBindB

  1. @Configuration
  2. public class DirectConfig {
  3. @Bean("direct_exchange_A")
  4. public DirectExchange directExchangeA() {
  5. return ExchangeBuilder.directExchange("direct_exchange_A").durable(true).build();
  6. }
  7. @Bean("direct_queue_A")
  8. public Queue directQueueA() {
  9. return QueueBuilder.durable("direct_queue_A").build();
  10. }
  11. @Bean
  12. public Binding directBindA(@Qualifier("direct_queue_A")Queue queue,
  13. @Qualifier("direct_exchange_A")DirectExchange directExchange){
  14. return BindingBuilder.bind(queue)
  15. .to(directExchange).withQueueName();
  16. }
  17. @Bean("direct_exchange_B")
  18. public DirectExchange directExchangeB() {
  19. return ExchangeBuilder.directExchange("direct_exchange_B").durable(true).build();
  20. }
  21. @Bean("direct_queue_B")
  22. public Queue directQueueB() {
  23. return QueueBuilder.durable("direct_queue_B").build();
  24. }
  25. @Bean
  26. public Binding directBindB(@Qualifier("direct_queue_B")Queue queue,
  27. @Qualifier("direct_exchange_B")DirectExchange directExchange){
  28. return BindingBuilder.bind(queue)
  29. .to(directExchange).withQueueName();
  30. }
  31. }

.withQueueName() 可见绑定key为queue名称。

第2步:Producer 项目创建发送消息的类DirectService :

  1. @Slf4j
  2. @Service
  3. public class DirectService {
  4. String routeKey = "direct_queue_A";
  5. @Autowired
  6. private RabbitTemplate rabbitTemplate;
  7. public void sendData(String data) {
  8. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  9. log.info("------------direct producer begin------------");
  10. rabbitTemplate.convertAndSend(routeKey, data);
  11. log.info("routeKey:"+routeKey);
  12. log.info("send data:"+data);
  13. log.info("------------direct producer end------------");
  14. }
  15. }

第3步:Consumer 创建接收消息的类DirectListener :

  1. @Component
  2. @Slf4j
  3. public class DirectListener {
  4. /**
  5. * .接收队列direct_queue_A的数据
  6. */
  7. @RabbitListener(queues = {
  8. "direct_queue_A"})
  9. public void receiveDataFromQueueA(Message message, Channel channel) throws IOException {
  10. long tag = message.getMessageProperties().getDeliveryTag();
  11. channel.basicAck(tag, true);
  12. log.info("------------direct consumer begin------------");
  13. log.info("queue: fanout_queue_A");
  14. log.info("receive Data: " + new String(message.getBody()));
  15. log.info("------------direct consumer end------------");
  16. }
  17. /**
  18. * .接收队列direct_queue_B的数据
  19. */
  20. @RabbitListener(queues = {
  21. "direct_queue_B"})
  22. public void receiveDataFromQueueB(Message message, Channel channel) throws IOException {
  23. long tag = message.getMessageProperties().getDeliveryTag();
  24. channel.basicAck(tag, true);
  25. log.info("------------direct consumer begin------------");
  26. log.info("queue: fanout_queue_B");
  27. log.info("receive Data: " + new String(message.getBody()));
  28. log.info("------------direct consumer end------------");
  29. }
  30. }

2.3 topic Exchange(主题交换机)

特点:将消息路由到路由键(RoutingKey)和绑定键(BindKey)相匹配的队列中。
在这里插入图片描述

第1步:创建配置类TopicConfig,创建了:
交换机(exchange):topic_exchange
队列(Queue):topic_queue_A
绑定(Binding):topicBindA
绑定key(BindKey):project1.station1.*
交换机(exchange):topic_exchange
队列(Queue):topic_queue_B
绑定(Binding):topicBindB
绑定key(BindKey):project1..

  1. @Configuration
  2. public class TopicConfig {
  3. String routeKeyA = "project1.station1.*";
  4. String routeKeyB = "project1.*.*";
  5. @Bean("topic_exchange")
  6. public TopicExchange topicExchange() {
  7. return ExchangeBuilder.topicExchange("topic_exchange").durable(true).build();
  8. }
  9. @Bean("topic_queue_A")
  10. public Queue topicQueueA(){
  11. return QueueBuilder.durable("topic_queue_A").build();
  12. }
  13. @Bean
  14. public Binding topicBindA(){
  15. return BindingBuilder.bind(topicQueueA()).to(topicExchange()).with(routeKeyA);
  16. }
  17. @Bean("topic_queue_B")
  18. public Queue topicQueueB(){
  19. return QueueBuilder.durable("topic_queue_B").build();
  20. }
  21. @Bean
  22. public Binding topicBindB(){
  23. return BindingBuilder.bind(topicQueueB()).to(topicExchange()).with(routeKeyB);
  24. }
  25. }

第2步:Producer 项目创建发送消息的类TopicService :

  1. @Slf4j
  2. @Service
  3. public class TopicService {
  4. String exchange = "topic_exchange";
  5. //String routeKey = "project1.station1.device1";
  6. String routeKey = "project1.station2.device1";
  7. @Resource
  8. private RabbitTemplate rabbitTemplate;
  9. public void sendData(String data) {
  10. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  11. log.info("------------topic producer begin------------");
  12. rabbitTemplate.convertAndSend(exchange, routeKey, data);
  13. log.info("routeKey:"+routeKey);
  14. log.info("send data:"+data);
  15. log.info("------------topic producer end------------");
  16. }
  17. }

第3步:Consumer 创建接收消息的类TopicListener :

  1. @Component
  2. @Slf4j
  3. public class TopicListener {
  4. /**
  5. * .接收队列topic_queue_A的数据
  6. */
  7. @RabbitListener(queues = {
  8. "topic_queue_A"})
  9. public void receiveDataFromQueueA(Message message, Channel channel) throws IOException {
  10. long tag = message.getMessageProperties().getDeliveryTag();
  11. channel.basicAck( tag, true);
  12. log.info("------------topic_ consumer begin------------");
  13. log.info("queue: topic_queue_A");
  14. log.info("receive Data: " + new String(message.getBody()));
  15. log.info("------------topic_ consumer end------------");
  16. }
  17. /**
  18. * .接收队列topic_queue_B的数据
  19. */
  20. @RabbitListener(queues = {
  21. "topic_queue_B"})
  22. public void receiveDataFromQueueB(Message message, Channel channel) throws IOException {
  23. long tag = message.getMessageProperties().getDeliveryTag();
  24. channel.basicAck( tag, true);
  25. log.info("------------topic_ consumer begin------------");
  26. log.info("queue: topic_queue_B");
  27. log.info("receive Data: " + new String(message.getBody()));
  28. log.info("------------topic_ consumer end------------");
  29. }
  30. }

2.4 headers Exchange(头交换机)

特点:路由规则是建立在头属性值之上,而不是路由键。
在这里插入图片描述

第1步:创建配置类TopicConfig,创建了:
交换机(exchange):headers_exchange
队列(Queue):headers_queue_A
绑定(Binding):headersBindA
头(headers):
headerValues.put(“type”, “gass”);
headerValues.put(“data”, “realTime”);
匹配规则:whereAll

队列(Queue):headers_queue_B
绑定(Binding):headersBindB
头(headers):
headerValues.put(“type”, “gass”);
headerValues.put(“data”, “realTime”);
匹配规则:whereAny

  1. @Configuration
  2. public class HeadersConfig {
  3. @Bean("headers_exchange")
  4. public HeadersExchange headersExchange() {
  5. return ExchangeBuilder.headersExchange("headers_exchange").durable(true).build();
  6. }
  7. @Bean("headers_queue_A")
  8. public Queue headerQueueA(){
  9. return QueueBuilder.durable("headers_queue_A").build();
  10. }
  11. @Bean
  12. public Binding headersBindA(){
  13. Map<String,Object> headerValues = new HashMap<>();
  14. headerValues.put("type", "gass");
  15. headerValues.put("data", "realTime");
  16. return BindingBuilder.bind(headerQueueA()).to(headersExchange()).whereAll(headerValues).match();
  17. }
  18. @Bean("headers_queue_B")
  19. public Queue headerQueueB(){
  20. return QueueBuilder.durable("headers_queue_B").build();
  21. }
  22. @Bean
  23. public Binding headersBindB(){
  24. Map<String,Object> headerValues = new HashMap<>();
  25. headerValues.put("type", "gass");
  26. headerValues.put("data", "realTime");
  27. return BindingBuilder.bind(headerQueueB()).to(headersExchange()).whereAny(headerValues).match();
  28. }
  29. }

第2步:Producer 项目创建发送消息的类HeadersService :

  1. @Slf4j
  2. @Service
  3. public class HeadersService {
  4. String exchange = "headers_exchange";
  5. String routeKey = "";
  6. @Resource
  7. private RabbitTemplate rabbitTemplate;
  8. public void sendData(String data) {
  9. log.info("------------headers producer begin------------");
  10. MessageProperties messageProperties = new MessageProperties();
  11. messageProperties.setHeader("type", "gass");
  12. messageProperties.setHeader("data", "realTime");
  13. //messageProperties.setHeader("data", "history");
  14. Message message = new Message(data.getBytes(), messageProperties);
  15. rabbitTemplate.convertAndSend(exchange, routeKey, message);
  16. log.info("send data:"+data);
  17. log.info("------------headers producer end------------");
  18. }
  19. }

第3步:Consumer 创建接收消息的类HeadersListener :

  1. @Component
  2. @Slf4j
  3. public class HeadersListener {
  4. /**
  5. * .接收队列headers_queue_A的数据
  6. */
  7. @RabbitListener(queues = {
  8. "headers_queue_A"})
  9. public void receiveDataFromQueueA(Message message, Channel channel) throws IOException {
  10. long tag = message.getMessageProperties().getDeliveryTag();
  11. channel.basicAck( tag, true);
  12. log.info("------------headers consumer begin------------");
  13. log.info("queue: headers_queue_A");
  14. log.info("receive Data: " + new String(message.getBody()));
  15. log.info("------------headers consumer begin------------");
  16. }
  17. /**
  18. * .接收队列headers_queue_B的数据
  19. */
  20. @RabbitListener(queues = {
  21. "headers_queue_B"})
  22. public void receiveDataFromQueueB(Message message, Channel channel) throws IOException {
  23. long tag = message.getMessageProperties().getDeliveryTag();
  24. channel.basicAck( tag, true);
  25. log.info("------------headers consumer begin------------");
  26. log.info("queue: headers_queue_B");
  27. log.info("receive Data: " + new String(message.getBody()));
  28. log.info("------------headers consumer begin------------");
  29. }
  30. }

3 测试验证

3.1 fanout Exchange验证

第1步:postman发送请求:
http://127.0.0.1:8081/rabbitMq/sendMessageByFanout?msg=hello,fanout exchangge!
在这里插入图片描述
第2步:可以看到producer发送消息:
在这里插入图片描述
第3步:可以看到consumer接收消息:
在这里插入图片描述
可见:与交换机fanout_exchange绑定的两个队列fanout_queue_A、fanout_queue_B都收到了消息。

3.2 direct Exchange验证

第1步:postman发送请求:
http://127.0.0.1:8081/rabbitMq/sendMessageByDirect?msg=hello,direct exchangge!
在这里插入图片描述
第2步:可以看到producer发送消息:
在这里插入图片描述
第3步:可以看到consumer接收消息:
在这里插入图片描述
可见:routeKey 和BindKey都为queue名称direct_queue_A,队列direct_queue_A收到了消息。

3.3 topic Exchange验证

3.3.1 topic_queue_A、topic_queue_B都匹配

路由键(routeKey):project1.station1.device1
绑定key(BindKey):project1.station1.*
绑定key(BindKey):project1..

第1步:postman发送请求:
http://127.0.0.1:8081/rabbitMq/sendMessageByTopic?msg=hello,topic exchangge!
在这里插入图片描述
第2步:可以看到producer发送消息:
在这里插入图片描述
第3步:可以导刊consumer接收消息:
在这里插入图片描述
可以看到两个队列 topic_queue_A、topic_queue_B都收到了消息。

3.3.2 只有topic_queue_B匹配

路由键(routeKey):project1.station2.device1
绑定key(BindKey):project1.station1.*
绑定key(BindKey):project1..

第1步:postman发送请求:
http://127.0.0.1:8081/rabbitMq/sendMessageByTopic?msg=hello,topic exchangge!
在这里插入图片描述
第2步:可以看到producer发送消息:
在这里插入图片描述
第3步:可以看到consumer接收消息:
在这里插入图片描述
可以看到只有队列topic_queue_B收到了消息。

3.4 headers Exchange验证

3.4.1 headers_queue_A、headers_queue_B都匹配

发送消息的header:

  1. messageProperties.setHeader("type", "gass");
  2. messageProperties.setHeader("data", "realTime");

第1步:postman发送请求:
http://127.0.0.1:8081/rabbitMq/sendMessageByHeaders?msg=hello,headers exchangge!
在这里插入图片描述
第2步:可以看到producer发送消息:
在这里插入图片描述
第3步:可以看到consumer接收消息:
在这里插入图片描述
可见headers_queue_A、headers_queue_B都收到了消息。

3.4.2 只有headers_queue_B匹配

发送消息的header:

  1. messageProperties.setHeader("type", "gass");
  2. messageProperties.setHeader("data", "history");

第1步:postman发送请求:
http://127.0.0.1:8081/rabbitMq/sendMessageByHeaders?msg=hello,headers exchangge!
在这里插入图片描述

第2步:可以看到producer发送消息:
在这里插入图片描述
第3步:可以看到consumer接收消息:
在这里插入图片描述
可见:只有headers_queue_B收到了消息。

代码详见:
https://gitee.com/linghufeixia/springboot-simple
chapter6-4 producer项目
chapter6-5 consumer项目


教程列表
springboot simple(0) springboot简介
springboot simple(1) springboot Helloworld
springboot simple(2) springboot Starter
springboot simple(3 )springboot Web开发
springboot simple(4)springboot 数据持久化
springboot simple (5) springboot Nosql
springboot simple (6) springboot mqtt
springboot simple (7) springboot thrift
springboot simple (8) springboot kafka
springboot simple (9) springboot jpa(Hibernate)
springboot simple (10) springboot protobuf
springboot simple (11) springboot protostuff
springboot simple (12) springboot RabbitMQ

发表评论

表情:
评论列表 (有 0 条评论,187人围观)

还没有评论,来说两句吧...

相关阅读