RabbitMQ五种消息队列学习(六)--通配符模式(路由类型:Topic)

淩亂°似流年 2022-06-03 03:59 251阅读 0赞

RabbitMQ五种消息队列学习(六)–通配符模式(路由类型:Topic)

标签(空格分隔): RabbitMQ


正如上一篇文章中所描述的一种模式的升级,如果需要监听某个交换机的所有消息的话,可以通过消息队列的形式进行绑定。

队列结构图

这里写图片描述
通过字符串通配符的模式匹配需要分发的消息队列
消息路径:
这里写图片描述

实现:
1、生产者

  1. private final static String EXCHANGE_NAME = "test_exchange_topic";
  2. public static void main(String[] argv) throws Exception {
  3. // 获取到连接以及mq通道
  4. Connection connection = ConnectionUtil.getConnection();
  5. Channel channel = connection.createChannel();
  6. // 声明exchange
  7. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  8. // 消息内容
  9. String message = "删除商品 id = 1000";
  10. channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());
  11. System.out.println(" [x] Sent '" + message + "'");
  12. channel.close();
  13. connection.close();
  14. }

2、消费者1

  1. private final static String QUEUE_NAME = "test_queue_topic_1";
  2. private final static String EXCHANGE_NAME = "test_exchange_topic";
  3. public static void main(String[] argv) throws Exception {
  4. // 获取到连接以及mq通道
  5. Connection connection = ConnectionUtil.getConnection();
  6. Channel channel = connection.createChannel();
  7. // 声明队列
  8. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  9. // 绑定队列到交换机
  10. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
  11. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
  12. // 同一时刻服务器只会发一条消息给消费者
  13. channel.basicQos(1);
  14. // 定义队列的消费者
  15. QueueingConsumer consumer = new QueueingConsumer(channel);
  16. // 监听队列,手动返回完成
  17. channel.basicConsume(QUEUE_NAME, false, consumer);
  18. // 获取消息
  19. while (true) {
  20. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  21. String message = new String(delivery.getBody());
  22. System.out.println(" [x] Received '" + message + "'");
  23. Thread.sleep(10);
  24. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  25. }
  26. }

3、消费者2

  1. private final static String QUEUE_NAME = "test_queue_topic_2";
  2. private final static String EXCHANGE_NAME = "test_exchange_topic";
  3. public static void main(String[] argv) throws Exception {
  4. // 获取到连接以及mq通道
  5. Connection connection = ConnectionUtil.getConnection();
  6. Channel channel = connection.createChannel();
  7. // 声明队列
  8. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  9. // 绑定队列到交换机
  10. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#");
  11. // 同一时刻服务器只会发一条消息给消费者
  12. channel.basicQos(1);
  13. // 定义队列的消费者
  14. QueueingConsumer consumer = new QueueingConsumer(channel);
  15. // 监听队列,手动返回完成
  16. channel.basicConsume(QUEUE_NAME, false, consumer);
  17. // 获取消息
  18. while (true) {
  19. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  20. String message = new String(delivery.getBody());
  21. System.out.println(" [x] Received '" + message + "'");
  22. Thread.sleep(10);
  23. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  24. }
  25. }

4、测试结果
消费者1中,明确了队列需要绑定路由器的哪些Routing Key

  1. // 绑定队列到交换机
  2. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
  3. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");

消费者2,因为要接收item下的所有消息,所以绑定为:

  1. // 绑定队列到交换机
  2. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#");

所有消费者2可以接收所有消息,消费者1只能接受 update\delete两种消息

发表评论

表情:
评论列表 (有 0 条评论,251人围观)

还没有评论,来说两句吧...

相关阅读