02、RabbitMQ之交换机

冷不防 2023-06-24 15:26 62阅读 0赞

一、 Exchange(交换机)的作用
在RabbitMQ中,生产者发送消息不会直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,
队列再将消息以推送或者拉取方式给消费者进行消费

  1. 创建消息 路由键 pull/push
  2. 生产者------------>交换机------------>队列------------>消费者

在这里插入图片描述

二、 Exchange(交换机)的类型

1)直连交换机:Direct Exchange
直连交换机是一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个routing_key,当消息被发送的时候,需要指定一个binding_key,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。同样的一个binding_key也是支持应用到多个队列中的。

在这里插入图片描述
在这里插入图片描述
2)主题交换机:Topic Exchange
弥补直连交换机的缺点!
直连交换机的routing_key方案非常简单,如果我们希望一条消息发送给多个队列,那么这个交换机需要绑定上非常多的routing_key,
假设每个交换机上都绑定一堆的routing_key连接到各个队列上。那么消息的管理就会异常地困难。

所以RabbitMQ提供了一种主题交换机,发送到主题交换机上的消息需要携带指定规则的routing_key,
主题交换机会根据这个规则将数据发送到对应的(多个)队列上。

主题交换机的routing_key需要有一定的规则,交换机和队列的binding_key需要采用*.#.*…的格式,每个部分用.分开,其中
*表示一个单词
#表示任意数量(零个或多个)单词。

  1. 示例:
  2. 队列Q1绑定键为 *.TT.*
  3. 队列Q2绑定键为TT.#
  4. 如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到
  5. 如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到

3)扇形交换机:Fanout Exchange
扇形交换机是最基本的交换机类型,它所能做的事情非常简单———广播消息。
扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要“思考”,
所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。

  1. 这个交换机没有路由键概念,就算你绑了路由键也是无视的。

4)首部交换机:Headers exchange

5)默认交换机

6)Dead Letter Exchange(死信交换机)

本文只讲前三种交换机

三、交换机的属性
除交换机类型外,在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:
Name:交换机名称
Durability:是否持久化。如果持久性,则RabbitMQ重启后,交换机还存在
Auto-delete:当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它
Arguments:扩展参数

**四、案例:交换机的使用 **

  1. 打开linux,开启docker,开启容器,容器的创建请参照
    https://blog.csdn.net/qq_43469718/article/details/103671005
    在这里插入图片描述
    输入界面版RabbitMQ
    在这里插入图片描述
  2. 创建springcloud项目
    rabbitmq02 #主模块
    rabbitmq-provider #生产者
    rabbitmq-consumer #消费者

    给父模块添加依赖

    <?xml version=”1.0” encoding=”UTF-8”?>


    4.0.0

    org.springframework.boot
    spring-boot-starter-parent
    2.1.9.RELEASE


    com.zking
    rabbitmq02
    0.0.1-SNAPSHOT
    rabbitmq02
    pom
    Demo project for Spring Boot


    rabbitmq-provider
    rabbitmq-consumer



    1.8




    org.springframework.boot
    spring-boot-starter-amqp



    org.springframework.boot
    spring-boot-starter-test
    test



    org.springframework.amqp
    spring-rabbit-test
    test



    org.springframework.boot
    spring-boot-starter-web



    org.projectlombok
    lombok
    1.18.10
    provided







    org.springframework.boot
    spring-boot-maven-plugin




子模块pom

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>com.zking</groupId>
  7. <artifactId>rabbitmq02</artifactId>
  8. <version>0.0.1-SNAPSHOT</version>
  9. </parent>
  10. <artifactId>rabbitmq-provider</artifactId>
  11. <version>0.0.1-SNAPSHOT</version>
  12. <name>rabbitmq-provider</name>
  13. <packaging>jar</packaging>
  14. <description>Demo project for Spring Boot</description>
  15. </project>
  16. <?xml version="1.0" encoding="UTF-8"?>
  17. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  18. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  19. <modelVersion>4.0.0</modelVersion>
  20. <parent>
  21. <groupId>com.zking</groupId>
  22. <artifactId>rabbitmq02</artifactId>
  23. <version>0.0.1-SNAPSHOT</version>
  24. </parent>
  25. <artifactId>rabbitmq-consumer</artifactId>
  26. <version>0.0.1-SNAPSHOT</version>
  27. <name>rabbitmq-consumer</name>
  28. <packaging>jar</packaging>
  29. <description>Demo project for Spring Boot</description>
  30. </project>

配置yml
rabbitmq-provider

  1. server:
  2. port: 8081
  3. servlet:
  4. context-path: /rabbitmq-provider
  5. spring:
  6. rabbitmq:
  7. virtual-host: my_vhost
  8. host: 192.168.208.130
  9. port: 5672
  10. username: admin
  11. password: admin

rabbitmq-consumer

  1. server:
  2. port: 8082
  3. servlet:
  4. context-path: /rabbitmq-consumer
  5. spring:
  6. rabbitmq:
  7. virtual-host: my_vhost
  8. host: 192.168.208.130
  9. port: 5672
  10. username: admin
  11. password: admin

在这里插入图片描述
配置完毕后


1.演示直连交换机:Direct Exchange
在生产者rabbitmq-provider新建包rabbitmq ,这个包专门用来放配置类
新建类RabbitDirect 配置类

  1. package com.zking.rabbitmqprovider.rabbitmq;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.DirectExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. /* * 功能描述: <br> * 〈〉用于绑定队列交换机路由键的关系 * 示例:直接交换机(Driect Exchange) * @Param: * @Return: * @Author: 骄傲的骨傲天 * @Date: 2019/12/23 22:27 */
  9. @Configuration
  10. public class RabbitDirect {
  11. /* * 功能描述: <br> * 〈〉1.定义一队列 * @Param: * @Return: * @Author: 骄傲的骨傲天 * @Date: 2019/12/23 22:34 */
  12. @Bean
  13. public Queue queue(){
  14. return new Queue("queue-1",true);
  15. }
  16. /* * 功能描述: <br> * 〈〉2.定义直接交换机 * @Param: * @Return: * @Author: 骄傲的骨傲天 * @Date: 2019/12/23 22:34 */
  17. @Bean
  18. public DirectExchange directExchange(){
  19. return new DirectExchange("direct-exchange");
  20. }
  21. /* * 功能描述: <br> * 〈〉3.将队列与直连交换机进行绑定,并以路由键来串联 * @Param: * @Return: * @Author: 骄傲的骨傲天 * @Date: 2019/12/23 22:36 */
  22. @Bean
  23. public Binding directBinding(){
  24. return BindingBuilder.bind(queue()).
  25. to(directExchange()).with("direct-routing-key");
  26. }
  27. }

新建controller包与rabbitmq同级目录
新建类SendController用户来向交换机发送消息根据路由键传到消息队列中
此类头上有个@RestController注解

  1. @Autowired
  2. private RabbitTemplate rabbitTemplate;
  3. @RequestMapping("/senderByDricet")
  4. public Map<String,Object> senderByDricet(){
  5. Map<String,Object> map=new HashMap<String,Object>();
  6. Map<String, Object> data = this.createData();
  7. log.info("生产者发送消息,queue={},directExchange={},routingkey={}",
  8. "queue-1","direct-exchange","direct-routing-key");
  9. log.info("msg={}",data);
  10. //第一个参数:交换机的名称
  11. //第二个参数:路由键
  12. //第三个参数:数据
  13. rabbitTemplate.convertAndSend("direct-exchange","direct-routing-key",data);
  14. map.put("code",1);
  15. map.put("msg","消息发送成功");
  16. return map;
  17. }
  18. public Map<String,Object> createData(){
  19. Map<String,Object> data=new HashMap<String,Object>();
  20. String ceateDate= LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  21. String message="hello,rabbitmq!!";
  22. data.put("createDate",ceateDate);
  23. data.put("massage",message);
  24. return data;
  25. }

运行
postman测试一下
在这里插入图片描述
再看下RabbitMQ web版有没有此数据列表
在这里插入图片描述
可以看到名为queue-1的消息队列中是有一条消息的

往消费者rabbitmq-consumer写入代码,消费此消息
创建rabbitmq
创建RabbitDirectReceiver
开始消费

  1. package com.zking.rabbitmqconsumer.rabbitmq;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.util.Map;
  7. @Component
  8. @RabbitListener(queues = { "queue-1"})
  9. @Slf4j
  10. public class RabbitDirectReceiver {
  11. @RabbitHandler
  12. public void handlerMessage(Map<String,Object> map){
  13. log.info("消费者接收消息。。。");
  14. log.info("msg={}",map);
  15. }
  16. }

在这里插入图片描述
看下消息已经被消费了在这里插入图片描述
关闭消费者rabbitmq-consumer,不让进行消费,因为演示主题交换机需要看到效果

2.演示主题交换机:Topic Exchange

同理我们先来制造消息
在生产者rabbitmq-provider中的rabbitmq包下创建主题交换机配置类RabbitTopicConfig

  1. package com.zking.rabbitmqprovider.rabbitmq;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.amqp.core.TopicExchange;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. /* * 功能描述: <br> * 〈〉主题交换器 * @Param: * @Return: * @Author: 骄傲的骨傲天 * @Date: 2019/12/23 23:33 */
  9. @Configuration
  10. public class RabbitTopicConfig {
  11. @Bean
  12. public Queue queueX(){
  13. return new Queue("queue-x",true);
  14. }
  15. @Bean
  16. public Queue queueY(){
  17. return new Queue("queue-y",true);
  18. }
  19. @Bean
  20. public Queue queueZ(){
  21. return new Queue("queue-z",true);
  22. }
  23. @Bean
  24. public TopicExchange topicExchange(){
  25. return new TopicExchange("topic-exchange");
  26. }
  27. @Bean
  28. public Binding bindingX(){
  29. return BindingBuilder.bind(queueX()).to(topicExchange())
  30. .with("topic.person.xxx");
  31. }
  32. @Bean
  33. public Binding bindingY(){
  34. return BindingBuilder.bind(queueY()).to(topicExchange())
  35. .with("topic.person.yyy");
  36. }
  37. @Bean
  38. public Binding bindingZ(){
  39. return BindingBuilder.bind(queueZ()).to(topicExchange())
  40. .with("topic.person.*");
  41. }
  42. }

可以看到我创建了三个消息队列:queue-x queue-y queue-z
注意看bindingZ方法的路由键规则topic.person.*
主题交换机的routing_key需要有一定的规则,交换机和队列的binding_key需要采用*.#.*.....的格式,每个部分用.分开,其中
*表示一个单词
#表示任意数量(零个或多个)单词。

然后再controller包下的SendController类中添加senderByTopic方法(演示直连交换机所创建的)

  1. @RequestMapping("/senderByTopic")
  2. public Map<String,Object> senderByTopic(String routingKey){
  3. Map<String,Object> map=new HashMap<String,Object>();
  4. Map<String, Object> data = this.createData();
  5. log.info("生产者发送消息,directExchange={},routingkey={}",
  6. "topic-exchange",routingKey);
  7. log.info("msg={}",data);
  8. //第一个参数:交换机的名称
  9. //第二个参数:路由键
  10. //第三个参数:数据
  11. rabbitTemplate.convertAndSend("topic-exchange",
  12. routingKey,data);
  13. map.put("code",1);
  14. map.put("msg","消息发送成功");
  15. return map;
  16. }

此方法中有个routingKey,这就是路由键,是活的,从前端传过的

重启rabbitmq-provider,使用postman测试一下
在这里插入图片描述
结果:
在这里插入图片描述
可以发现我的路由键参数为:topic.person.yyy 同时匹配queue-yqueue-z的规则,所以两个消息队列中都有消息

往消费者rabbitmq-consumer写入代码,消费此两条消息
rabbitmq包下创建三个类:RabbitTopicReceiverX RabbitTopicReceiverY RabbitTopicReceiverZ

  1. package com.zking.rabbitmqconsumer.rabbitmq;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.util.Map;
  7. @Component
  8. @RabbitListener(queues = { "queue-x"})
  9. @Slf4j
  10. public class RabbitTopicReceiverX {
  11. @RabbitHandler
  12. public void handlerMessage(Map<String,Object> map){
  13. log.info("消费者接收消息。。。");
  14. log.info("RabbitTopicReceiverX.handlermessage={}",map);
  15. }
  16. }
  17. package com.zking.rabbitmqconsumer.rabbitmq;
  18. import lombok.extern.slf4j.Slf4j;
  19. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  20. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  21. import org.springframework.stereotype.Component;
  22. import java.util.Map;
  23. @Component
  24. @RabbitListener(queues = { "queue-y"})
  25. @Slf4j
  26. public class RabbitTopicReceiverY {
  27. @RabbitHandler
  28. public void handlerMessage(Map<String,Object> map){
  29. log.info("消费者接收消息。。。");
  30. log.info("RabbitTopicReceiverY.handlermessage={}",map);
  31. }
  32. }
  33. package com.zking.rabbitmqconsumer.rabbitmq;
  34. import lombok.extern.slf4j.Slf4j;
  35. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  36. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  37. import org.springframework.stereotype.Component;
  38. import java.util.Map;
  39. @Component
  40. @RabbitListener(queues = { "queue-z"})
  41. @Slf4j
  42. public class RabbitTopicReceiverZ {
  43. @RabbitHandler
  44. public void handlerMessage(Map<String,Object> map){
  45. log.info("消费者接收消息。。。");
  46. log.info("RabbitTopicReceiverZ.handlermessage={}",map);
  47. }
  48. }

运行消费者
在这里插入图片描述
在这里插入图片描述
可以看到 y z消息队列已经被消费了

3.演示扇形交换机
扇形交换机是与路由键无关的
首先在生产者rabbitmq-provider下的rabbitmq中创建扇形交换机的配置类RabbitFanoutConfig

  1. package com.zking.rabbitmqprovider.rabbitmq;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.FanoutExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import javax.validation.executable.ValidateOnExecution;
  9. /* * 功能描述: <br> * 〈〉扇形交换机,用于广播消息,于路由键无关 * @Param: * @Return: * @Author: 骄傲的骨傲天 * @Date: 2019/12/23 23:32 */
  10. @Configuration
  11. public class RabbitFanoutConfig {
  12. @Bean
  13. public Queue queueA(){
  14. return new Queue("queue-a",true);
  15. }
  16. @Bean
  17. public Queue queueB(){
  18. return new Queue("queue-b",true);
  19. }
  20. @Bean
  21. public Queue queueC(){
  22. return new Queue("queue-c",true);
  23. }
  24. @Bean
  25. public FanoutExchange fanoutExchange(){
  26. return new FanoutExchange("fanout-exchange");
  27. }
  28. @Bean
  29. public Binding bindingA(){
  30. return BindingBuilder.bind(queueA()).to(fanoutExchange());
  31. }
  32. @Bean
  33. public Binding bindingB(){
  34. return BindingBuilder.bind(queueB()).to(fanoutExchange());
  35. }
  36. @Bean
  37. public Binding bindingC(){
  38. //可以看到这里不需要with
  39. return BindingBuilder.bind(queueC()).to(fanoutExchange());
  40. }
  41. }

controller包中的SendController中添加方法

  1. @RequestMapping("/senderByFanout")
  2. public Map<String,Object> senderByFanout(){
  3. Map<String,Object> map=new HashMap<String,Object>();
  4. Map<String, Object> data = this.createData();
  5. log.info("生产者发送消息,fanoutExchange={}",
  6. "fanout-exchange");
  7. log.info("msg={}",data);
  8. //第一个参数:交换机的名称
  9. //第二个参数:路由键
  10. //第三个参数:数据
  11. rabbitTemplate.convertAndSend("fanout-exchange",
  12. null,data);
  13. map.put("code",1);
  14. map.put("msg","消息发送成功");
  15. return map;
  16. }

可以看到rabbitTemplate.convertAndSend("fanout-exchange", null,data);路由键是null
因此,此方法发送消息所有与fanout-exchange交换机关联的消息队列都会被发送消息

运行生产者rabbitmq-provider
postman测试一下
在这里插入图片描述
在这里插入图片描述
三个消息队列都有消息

往消费者rabbitmq-consumer写入代码,消费消息
rabbitmq包下写入三个类:RabbitFanoutReceiverA RabbitFanoutReceiverB RabbitFanoutReceiverC

  1. package com.zking.rabbitmqconsumer.rabbitmq;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.util.Map;
  7. @Component
  8. @RabbitListener(queues = { "queue-a"})
  9. @Slf4j
  10. public class RabbitFanoutReceiverA {
  11. @RabbitHandler
  12. public void handlerMessage(Map<String,Object> map){
  13. log.info("消费者接收消息。。。");
  14. log.info("RabbitTopicReceiverA.handlermessage={}",map);
  15. }
  16. }
  17. package com.zking.rabbitmqconsumer.rabbitmq;
  18. import lombok.extern.slf4j.Slf4j;
  19. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  20. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  21. import org.springframework.stereotype.Component;
  22. import java.util.Map;
  23. @Component
  24. @RabbitListener(queues = { "queue-b"})
  25. @Slf4j
  26. public class RabbitFanoutReceiverB {
  27. @RabbitHandler
  28. public void handlerMessage(Map<String,Object> map){
  29. log.info("消费者接收消息。。。");
  30. log.info("RabbitTopicReceiverB.handlermessage={}",map);
  31. }
  32. }
  33. package com.zking.rabbitmqconsumer.rabbitmq;
  34. import lombok.extern.slf4j.Slf4j;
  35. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  36. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  37. import org.springframework.stereotype.Component;
  38. import java.util.Map;
  39. @Component
  40. @RabbitListener(queues = { "queue-c"})
  41. @Slf4j
  42. public class RabbitFanoutReceiverC {
  43. @RabbitHandler
  44. public void handlerMessage(Map<String,Object> map){
  45. log.info("消费者接收消息。。。");
  46. log.info("RabbitTopicReceiverC.handlermessage={}",map);
  47. }
  48. }

启动消费者
在这里插入图片描述
在这里插入图片描述
效果演示完毕!!!
给大家看下项目最终的结构
在这里插入图片描述

在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,62人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Rabbitmq死信交换机

    正常的Rabbitmq流程是生产者把消息先到交换机,交换机分发到队列,然后消费者从队列中取出消息 死信交换机就是给消息设置过期时间TTL,然后将正常的队列绑定死信交换机,死信

    相关 02RabbitMQ交换机

    一、 Exchange(交换机)的作用 在RabbitMQ中,生产者发送消息不会直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列, 队列再

    相关 RabbitMQExchange交换机

    RabbitMQ中的Exchange的作用 消息队列,消息通过发送和Exchange之后最终到达的地方,到达Queue的消息及进入了等待消费的状态。每个消息都会被发送到一