RabbitMQ路由模式

墨蓝 2022-05-16 04:39 364阅读 0赞

路由模式

一个生产者,发送消息
每个消费者,都有一个独立的队列

消息发送到交换机,交换机发送到每个队列
根据key,是否相等,来接收消息
这里写图片描述

Send

生产者

  1. package cn.itcast.rabbitmq.routing;
  2. import cn.itcast.rabbitmq.util.ConnectionUtil;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. public class Send {
  6. private final static String EXCHANGE_NAME = "test_exchange_direct";
  7. public static void main(String[] argv) throws Exception {
  8. // 获取到连接以及mq通道
  9. Connection connection = ConnectionUtil.getConnection();
  10. Channel channel = connection.createChannel();
  11. // 声明exchange
  12. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  13. // 消息内容
  14. String message = "删除商品,id=1000";
  15. channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
  16. System.out.println(" 后台系统 '" + message + "'");
  17. channel.close();
  18. connection.close();
  19. }
  20. }

Recv

消费者1

  1. package cn.itcast.rabbitmq.routing;
  2. import cn.itcast.rabbitmq.util.ConnectionUtil;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.QueueingConsumer;
  6. public class Recv {
  7. private final static String QUEUE_NAME = "test_queue_direct1";
  8. private final static String EXCHANGE_NAME = "test_exchange_direct";
  9. public static void main(String[] argv) throws Exception {
  10. // 获取到连接以及mq通道
  11. Connection connection = ConnectionUtil.getConnection();
  12. Channel channel = connection.createChannel();
  13. // 声明队列
  14. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  15. // 绑定队列到交换机
  16. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
  17. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
  18. // 同一时刻服务器只会发一条消息给消费者
  19. channel.basicQos(1);
  20. // 定义队列的消费者
  21. QueueingConsumer consumer = new QueueingConsumer(channel);
  22. // 监听队列,手动返回完成
  23. channel.basicConsume(QUEUE_NAME, false, consumer);
  24. // 获取消息
  25. while (true) {
  26. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  27. String message = new String(delivery.getBody());
  28. System.out.println(" 前台系统: '" + message + "'");
  29. Thread.sleep(10);
  30. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  31. }
  32. }
  33. }

Recv2

消费者2

  1. package cn.itcast.rabbitmq.routing;
  2. import cn.itcast.rabbitmq.util.ConnectionUtil;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.QueueingConsumer;
  6. public class Recv2 {
  7. private final static String QUEUE_NAME = "test_queue_direct2";
  8. private final static String EXCHANGE_NAME = "test_exchange_direct";
  9. public static void main(String[] argv) throws Exception {
  10. // 获取到连接以及mq通道
  11. Connection connection = ConnectionUtil.getConnection();
  12. Channel channel = connection.createChannel();
  13. // 声明队列
  14. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  15. // 绑定队列到交换机
  16. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
  17. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
  18. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
  19. // 同一时刻服务器只会发一条消息给消费者
  20. channel.basicQos(1);
  21. // 定义队列的消费者
  22. QueueingConsumer consumer = new QueueingConsumer(channel);
  23. // 监听队列,手动返回完成
  24. channel.basicConsume(QUEUE_NAME, false, consumer);
  25. // 获取消息
  26. while (true) {
  27. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  28. String message = new String(delivery.getBody());
  29. System.out.println(" 搜索系统: '" + message + "'");
  30. Thread.sleep(10);
  31. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  32. }
  33. }
  34. }

启动发送者

这里写图片描述

启动消费者

此时,有五个绑定队列

消费者1,两个
消费者2,三个
这里写图片描述

测试

发送消息,依次将key设置为
insert、update、delete

消费者1
前台系统,接收了2条消息
这里写图片描述
消费者2
搜索系统,接收了3条消息
这里写图片描述

发表评论

表情:
评论列表 (有 0 条评论,364人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RabbitMQ:模式

    在路由工作模式中,我们需要配置一个类型为direct的交换机,并且需要指定不同的路由键(routing key),把对应的消息从交换机路由到不同的消息队列进行存储,由消费...

    相关 rabbitmq(6)模式

    1、模型 路由模式为升级版的订阅模式,增加了消费者选择性接收消息的功能。 每个消费者可以绑定多个routingKey,生产者在发送消息时指定routingKey致使只

    相关 RabbitMQ模式

    路由模式 一个生产者,发送消息 每个消费者,都有一个独立的队列 消息发送到交换机,交换机发送到每个队列 根据key,是否相等,来接收消息 ![这里写图片描述