RabbitMQ详解RabbitMQ的五种模式

超、凢脫俗 2023-06-30 02:25 41阅读 0赞

RabbitMQ详解———RabbitMQ的五种模式

1.简单队列(模式)

上一篇文章末尾的实例给出的代码就是简单模式.

一个生产者对应一个消费者!!!

pom.xml

​ 必须导入RabbitMQ依赖包

  1. <!--RabbitMQ-client-->
  2. <dependency>
  3. <groupId>com.rabbitmq</groupId>
  4. <artifactId>amqp-client</artifactId>
  5. <version>3.6.2</version>
  6. </dependency>

ConnectionUtil.java

  1. package org.alva.Utils;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. /**
  7. * <一句话描述>,RabbitMQ的连接工具类
  8. * <详细介绍>,
  9. *
  10. */
  11. public class ConnectionUtil {
  12. public static Connection getConnection(String host, int port, String vhost, String username, String password) throws IOException, TimeoutException {
  13. //1.定义连接工厂
  14. ConnectionFactory connectionFactory = new ConnectionFactory();
  15. //2.设置服务器地址
  16. connectionFactory.setHost(host);
  17. //3.设置端口
  18. connectionFactory.setPort(port);
  19. //4.设置虚拟主机,用户名,密码
  20. connectionFactory.setVirtualHost(vhost);
  21. connectionFactory.setUsername(username);
  22. connectionFactory.setPassword(password);
  23. //5.通过连接工厂获取连接
  24. Connection connection = connectionFactory.newConnection();
  25. return connection;
  26. }
  27. }

Consumer.java

  1. package org.alva.RabbitMQ;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.QueueingConsumer;
  5. import org.alva.Utils.ConnectionUtil;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. /**
  9. * <一句话描述>,消费者
  10. * <详细介绍>,
  11. *
  12. */
  13. public class Consumer {
  14. private final static String QUEUE_NAME = "hello";
  15. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
  16. //1.获取连接
  17. Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
  18. //2.声明通道
  19. Channel channel = connection.createChannel();
  20. //3.声明队列
  21. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  22. //4.定义队列的消费者
  23. QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
  24. //5.监听队列
  25. /*
  26. true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息成功消费.
  27. false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,
  28. 如果消费者一直没有反馈,那么该消息将一直处于不可用状态,并且服务器会认为该消费者已经挂掉,不会再给其发送消息,
  29. 直到该消费者反馈.
  30. */
  31. channel.basicConsume(QUEUE_NAME,true,queueingConsumer);
  32. //6.获取消息
  33. while (true){
  34. QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
  35. String message = new String(delivery.getBody());
  36. System.out.println("[x] Received '" + message + "'");
  37. }
  38. }
  39. }

Productor.java

  1. package org.alva.RabbitMQ;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import org.alva.Utils.ConnectionUtil;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * <一句话描述>,生产者
  9. * <详细介绍>,
  10. *
  11. */
  12. public class Producer {
  13. private final static String QUEUE_NAME = "hello";
  14. public static void main(String[] args) throws IOException, TimeoutException {
  15. //1.获取连接
  16. Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
  17. //2.声明通道
  18. Channel channel = connection.createChannel();
  19. //3.声明(创建)队列
  20. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
  21. //4.定义消息内容
  22. String message = "hello rabbitmq";
  23. //5.发布消息
  24. channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
  25. System.out.println("[x] send'"+message+"'");
  26. //6.关闭通道和连接
  27. channel.close();
  28. connection.close();
  29. }
  30. }

2.work模式

一个生产者对应多个消费者,但是只能有一个消费者获得消息!!!

竞争消费者模式.

  1. 生产者

    1. package org.alva.RabbitMQ.WorkModel;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import org.alva.Utils.ConnectionUtil;
    5. import java.io.IOException;
    6. import java.util.concurrent.TimeoutException;
    7. /**
    8. * <一句话描述>,生产者
    9. * <详细介绍>,Work模式下的生产者
    10. *
    11. */
    12. public class Producter {
    13. public static final String QUEUE_NAME = "work_queue";
    14. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    15. //1.获取连接
    16. Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
    17. //2.声明信道
    18. Channel channel = connection.createChannel();
    19. //3.声明(创建)队列
    20. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    21. //4.定义消息内容,发布多条消息
    22. for (int i = 0; i < 10; i++) {
    23. String message = "hello rabbitmq " + i;
    24. //5.发布消息
    25. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    26. System.out.println("[x] send message is '" + message + "'");
    27. //6.模拟发送消息延时,便于展示多个消费者竞争接受消息
    28. Thread.sleep(i * 10);
    29. }
    30. //7.关闭信道
    31. channel.close();
    32. //8.关闭连接
    33. connection.close();
    34. }
    35. }
  2. 消费者

    需要创建两个消费者.

    消费者1:每接收一条消息后休眠10毫秒.

    1. package org.alva.RabbitMQ.WorkModel;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.QueueingConsumer;
    5. import org.alva.Utils.ConnectionUtil;
    6. import java.io.IOException;
    7. import java.util.concurrent.TimeoutException;
    8. /**
    9. * <一句话描述>,消费者
    10. * <详细介绍>,Work模式下的消费者
    11. *
    12. */
    13. public class Consumer1 {
    14. public static final String QUEUE_NAME = "work_queue";
    15. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    16. //1.获取连接
    17. Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
    18. //2.声明通道
    19. Channel channel = connection.createChannel();
    20. //3.声明队列
    21. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    22. //同一时刻服务器只会发送一条消息给消费者
    23. // channel.basicQos(1);
    24. //4.定义队列的消费者
    25. QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    26. //5.监听队列,手动返回完成状态
    27. channel.basicConsume(QUEUE_NAME,false,queueingConsumer);
    28. //6.获取消息
    29. while (true){
    30. QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
    31. String message = new String(delivery.getBody());
    32. System.out.println("[x] received message : '"+message+"'");
    33. //休眠10毫秒
    34. Thread.sleep(10);
    35. //返回确认状态
    36. channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    37. }
    38. }
    39. }

    消费者2:每接收一条消息后休眠1000毫秒

    1. package org.alva.RabbitMQ.WorkModel;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.QueueingConsumer;
    5. import org.alva.Utils.ConnectionUtil;
    6. import java.io.IOException;
    7. import java.util.concurrent.TimeoutException;
    8. /**
    9. * <一句话描述>,
    10. * <详细介绍>,
    11. *
    12. */
    13. public class Consumer2 {
    14. public static final String QUEUE_NAME = "work_queue";
    15. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    16. Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
    17. Channel channel = connection.createChannel();
    18. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    19. // channel.basicQos(1);
    20. QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    21. channel.basicConsume(QUEUE_NAME,false,queueingConsumer);
    22. while (true){
    23. QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
    24. String message = new String(delivery.getBody());
    25. System.out.println("[x] received message : '" + message + "'");
    26. Thread.sleep(1000);
    27. channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    28. }
    29. }
    30. }
  3. 测试结果

    1. 首先生产者一次打印从0-9条消息

      image-20180819000057095

    2. 然后是消费者1:结果为打印偶数条消息(注:先启动的消费者为消费者1)

      image-20180819000259211

    3. 消费者2:结果为打印奇数条消息

      image-20180819000335579

    结论:

    ​ **消费者1和消费者2获取到的消息内容是不同的,也就是说同一个消息只能被一个消费者获取.*

    ​ **消费者1和消费者2分别获取奇数条消息和偶数条消息,两种获取消息的条数是一样的.*

    ​ 前面我们说这种模式是竞争消费者模式,一条队列被多个消费者监听,这里两个消费者,其中消费者1和消费者2在获取消息后分别休眠了10毫秒和1000毫秒,也就是说两个消费者获取消息的效率是不一样的,但是结果却是两者获得的消息条数是一样的,这根本不构成竞争关系,那么我们应该怎么办才能让工作效率更高的消费者获取消息更多,也就是消费者1获取消息更多呢?

    1. 能者多劳

      1. channel.basicQos(1);

      增加如上代码,表示同一时刻服务器只会发送一条消息给消费者.消费者1和消费者2获取消息结果如下:

      image-20180819001133009

      image-20180819001145486

    2. 应用场景

      效率高的消费者消费消息多,可以用来进行负载均衡.

3.发布/订阅模式

一个消费者将消息首先发送到交换器,交换器绑定多个队列,然后被监听该队列的消费者所接收并消费.

*在RabbitMQ中,交换器主要有四种类型:direct,fanout,topic,headers,这里的交换器是fanout.

  1. 生产者

    1. package org.alva.RabbitMQ.PublishModel;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import org.alva.Utils.ConnectionUtil;
    5. import java.io.IOException;
    6. import java.util.concurrent.TimeoutException;
    7. /**
    8. * <一句话描述>,发布/订阅模式下的生产者
    9. * <详细介绍>,
    10. *
    11. */
    12. public class Producer {
    13. private final static String EXCHANGE_NAME = "fanout_exchange";
    14. public static void main(String[] args) throws IOException, TimeoutException {
    15. //1.获取连接
    16. Connection connection = ConnectionUtil.getConnection("localhost", 5674, "/", "guest", "guest");
    17. //2.声明信道
    18. Channel channel = connection.createChannel();
    19. //3.声明交换器
    20. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    21. //4.定义消息内容
    22. String message = "hello rabbitmq";
    23. //5.发布消息
    24. channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
    25. System.out.println("[x] send'" + message + "'");
    26. //6.关闭通道和连接
    27. channel.close();
    28. connection.close();
    29. }
    30. }
  2. 消费者

    消费者1:

    1. package org.alva.RabbitMQ.PublishModel;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.QueueingConsumer;
    5. import org.alva.Utils.ConnectionUtil;
    6. import java.io.IOException;
    7. import java.util.concurrent.TimeoutException;
    8. /**
    9. * <一句话描述>,消费者
    10. * <详细介绍>,发布/订阅模式下的消费者
    11. *
    12. */
    13. public class Consumer1 {
    14. public static final String QUEUE_NAME = "fanout_queue_1";
    15. public static final String EXCHANGE_NAME = "fanout_exchange";
    16. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    17. //1.获取连接
    18. Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
    19. //2.声明信道
    20. Channel channel = connection.createChannel();
    21. //3.声明交换器
    22. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    23. //4.绑定队列到交换器
    24. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    25. //同一时刻服务器只会发送一条消息给消费者
    26. channel.basicQos(1);
    27. //5.定义队列的消费者
    28. QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    29. //5.监听队列,手动返回完成状态
    30. channel.basicConsume(QUEUE_NAME, false, queueingConsumer);
    31. //6.获取消息
    32. while (true) {
    33. QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
    34. String message = new String(delivery.getBody());
    35. System.out.println("[消费者1] received message : '" + message + "'");
    36. //休眠10毫秒
    37. Thread.sleep(10);
    38. //返回确认状态
    39. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    40. }
    41. }
    42. }

    消费者2:

    1. package org.alva.RabbitMQ.PublishModel;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.QueueingConsumer;
    5. import org.alva.Utils.ConnectionUtil;
    6. import java.io.IOException;
    7. import java.util.concurrent.TimeoutException;
    8. /**
    9. * <一句话描述>,
    10. * <详细介绍>,
    11. *
    12. */
    13. public class Consumer2 {
    14. public static final String QUEUE_NAME = "fanout_queue_2";
    15. private final static String EXCHANGE_NAME = "fanout_exchange";
    16. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    17. Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
    18. Channel channel = connection.createChannel();
    19. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    20. channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
    21. channel.basicQos(1);
    22. QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    23. channel.basicConsume(QUEUE_NAME, false, queueingConsumer);
    24. while (true) {
    25. QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
    26. String message = new String(delivery.getBody());
    27. System.out.println("[消费者2] received message : '" + message + "'");
    28. Thread.sleep(1000);
    29. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    30. }
    31. }
    32. }

    注意:消费者1和消费者2两者监听的队列名称是不一样的.

  3. 测试结果

    image-20180819235254849

    image-20180819235351940

    image-20180819235425091

    消费者1和消费者2都消费了该消息.

    ps:这是因为消费者1和消费者2都监听了被同一个交换器绑定的队列.如果消息发送到没有队列绑定的交换器时,消息将丢失,因为交换器没有存储消息的能力,消息只能存储在队列.

  4. 应用场景:

    比如一个商城系统需要在管理员上传新的商品图片时,前台系统必须更新图片,日志系统必须记录相应的日志,那么就可以将两个队列绑定到图片上传交换器上,一个用于前台系统刚更新图片,另一个用于日志系统记录日志.

4.路由模式

生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费信息.

  1. 生产者

    1. package org.alva.RabbitMQ.DirectExchange;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import org.alva.Utils.ConnectionUtil;
    5. import java.io.IOException;
    6. import java.util.concurrent.TimeoutException;
    7. /**
    8. * <一句话描述>,路由模式下的生产者
    9. * <详细介绍>,
    10. *
    11. */
    12. public class Producer {
    13. private final static String EXCHANGE_NAME = "direct_exchange";
    14. public static void main(String[] args) throws IOException, TimeoutException {
    15. //1.获取连接
    16. Connection connection = ConnectionUtil.getConnection("localhost", 5674, "/", "guest", "guest");
    17. //2.声明信道
    18. Channel channel = connection.createChannel();
    19. //3.声明交换器,类型为direct
    20. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    21. //4.定义消息内容
    22. String message = "hello rabbitmq";
    23. //5.发布消息
    24. channel.basicPublish(EXCHANGE_NAME, "update", null, message.getBytes());
    25. System.out.println("[x] send'" + message + "'");
    26. //6.关闭通道和连接
    27. channel.close();
    28. connection.close();
    29. }
    30. }
  2. 消费者

    消费者1:

    1. package org.alva.RabbitMQ.DirectExchange;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.QueueingConsumer;
    5. import org.alva.Utils.ConnectionUtil;
    6. import java.io.IOException;
    7. import java.util.concurrent.TimeoutException;
    8. /**
    9. * <一句话描述>,消费者
    10. * <详细介绍>,路由模式下的消费者1
    11. * <p>
    12. * 这种模式添加了一个路由键,生产者发布消息的时候添加路由键,消费者绑定队列到交换机时添加键值,
    13. * 这样就可以接收到需要接收的消息。
    14. *
    15. */
    16. public class Consumer1 {
    17. public static final String QUEUE_NAME = "direct_queue_1";
    18. public static final String EXCHANGE_NAME = "direct_exchange";
    19. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    20. //1.获取连接
    21. Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
    22. //2.声明信道
    23. Channel channel = connection.createChannel();
    24. //3.声明队列
    25. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    26. //4.绑定队列到交换器,指定路由key为update
    27. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
    28. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
    29. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "add");
    30. //同一时刻服务器只会发送一条消息给消费者
    31. channel.basicQos(1);
    32. //5.定义队列的消费者
    33. QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    34. //5.监听队列,手动返回完成状态
    35. channel.basicConsume(QUEUE_NAME, false, queueingConsumer);
    36. //6.获取消息
    37. while (true) {
    38. QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
    39. String message = new String(delivery.getBody());
    40. System.out.println("[消费者1] received message : '" + message + "'");
    41. //休眠10毫秒
    42. Thread.sleep(10);
    43. //返回确认状态
    44. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    45. }
    46. }
    47. }

    消费者2:

    1. package org.alva.RabbitMQ.DirectExchange;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.QueueingConsumer;
    5. import org.alva.Utils.ConnectionUtil;
    6. import java.io.IOException;
    7. import java.util.concurrent.TimeoutException;
    8. /**
    9. * <一句话描述>,消费者
    10. * <详细介绍>,路由模式下的消费者2
    11. *
    12. */
    13. public class Consumer2 {
    14. public static final String QUEUE_NAME = "direct_queue_2";
    15. public static final String EXCHANGE_NAME = "direct_exchange";
    16. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    17. //1.获取连接
    18. Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
    19. //2.声明信道
    20. Channel channel = connection.createChannel();
    21. //3.声明队列
    22. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    23. //4.绑定队列到交换器,指定路由key为select
    24. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "select");
    25. //同一时刻服务器只会发送一条消息给消费者
    26. channel.basicQos(1);
    27. //5.定义队列的消费者
    28. QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    29. //5.监听队列,手动返回完成状态
    30. channel.basicConsume(QUEUE_NAME, false, queueingConsumer);
    31. //6.获取消息
    32. while (true) {
    33. QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
    34. String message = new String(delivery.getBody());
    35. System.out.println("[消费者1] received message : '" + message + "'");
    36. //休眠10毫秒
    37. Thread.sleep(10);
    38. //返回确认状态
    39. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    40. }
    41. }
    42. }
  3. 测试结果

    生产者发布消息,指定的路由key为update,消费者1绑定队列和交换器时key分别是update/delete/add;消费者2绑定队列和交换器时key时select.

    所以可以猜到生产者发送的消息,只有消费者1能够接收并消费,而消费者2是不能接收的.

    image-20180820101737302

    image-20180820101752244

    image-20180820101805624

  4. 应用场景

    利用消费者能够有选择性的接收消息的特性,比如商场系统的后台管理系统对于商品进行修改、删除、新增操作都需要更新前台系统的界面展示,而查询操作不需要,那么这两个队列分开接收消息就比较好.

5.主题模式

上面的路由模式是根据路由key进行完整的匹配(完全相等才发送消息),这里的通配符模式通俗的来讲就是模糊匹配.

符号”#“表示匹配一个或多个词,符号”\“表示匹配一个词.*

  1. 生产者

    1. package org.alva.RabbitMQ.TopicExchange;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import org.alva.Utils.ConnectionUtil;
    5. import java.io.IOException;
    6. import java.util.concurrent.TimeoutException;
    7. /**
    8. * <一句话描述>,主题模式下的生产者
    9. * <详细介绍>,
    10. *
    11. */
    12. public class Producer {
    13. private final static String EXCHANGE_NAME = "topic_exchange";
    14. public static void main(String[] args) throws IOException, TimeoutException {
    15. //1.获取连接
    16. Connection connection = ConnectionUtil.getConnection("localhost", 5674, "/", "guest", "guest");
    17. //2.声明信道
    18. Channel channel = connection.createChannel();
    19. //3.声明交换器,类型为direct
    20. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    21. //4.定义消息内容
    22. String message = "hello rabbitmq";
    23. //5.发布消息
    24. channel.basicPublish(EXCHANGE_NAME, "update.Name", null, message.getBytes());
    25. System.out.println("[x] send'" + message + "'");
    26. //6.关闭通道和连接
    27. channel.close();
    28. connection.close();
    29. }
    30. }
  2. 消费者1

    1. package org.alva.RabbitMQ.TopicExchange;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.QueueingConsumer;
    5. import org.alva.Utils.ConnectionUtil;
    6. import java.io.IOException;
    7. import java.util.concurrent.TimeoutException;
    8. /**
    9. * <一句话描述>,消费者
    10. * <详细介绍>,主题模式下的消费者1
    11. *
    12. */
    13. public class Consumer1 {
    14. public static final String QUEUE_NAME = "topic_queue_1";
    15. public static final String EXCHANGE_NAME = "topic_exchange";
    16. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    17. //1.获取连接
    18. Connection connection = ConnectionUtil.getConnection("localhost", 5673, "/", "guest", "guest");
    19. //2.声明信道
    20. Channel channel = connection.createChannel();
    21. //3.声明队列
    22. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    23. //4.绑定队列到交换器,指定路由key为update
    24. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update.#");
    25. //同一时刻服务器只会发送一条消息给消费者
    26. channel.basicQos(1);
    27. //5.定义队列的消费者
    28. QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    29. //5.监听队列,手动返回完成状态
    30. channel.basicConsume(QUEUE_NAME, false, queueingConsumer);
    31. //6.获取消息
    32. while (true) {
    33. QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
    34. String message = new String(delivery.getBody());
    35. System.out.println("[消费者1] received message : '" + message + "'");
    36. //休眠10毫秒
    37. Thread.sleep(10);
    38. //返回确认状态
    39. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    40. }
    41. }
    42. }
  3. 消费者2

    1. package org.alva.RabbitMQ.TopicExchange;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.QueueingConsumer;
    5. import org.alva.Utils.ConnectionUtil;
    6. import java.io.IOException;
    7. import java.util.concurrent.TimeoutException;
    8. /**
    9. * <一句话描述>,消费者
    10. * <详细介绍>,主题模式下的消费者2
    11. *
    12. * @author 穆国超
    13. * @since 设计wiki | 需求wiki
    14. */
    15. public class Consumer2 {
    16. public static final String QUEUE_NAME = "topic_queue_2";
    17. public static final String EXCHANGE_NAME = "topic_exchange";
    18. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    19. //1.获取连接
    20. Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
    21. //2.声明信道
    22. Channel channel = connection.createChannel();
    23. //3.声明队列
    24. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    25. //4.绑定队列到交换器,指定路由key为select
    26. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "select.#");
    27. //同一时刻服务器只会发送一条消息给消费者
    28. channel.basicQos(1);
    29. //5.定义队列的消费者
    30. QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    31. //5.监听队列,手动返回完成状态
    32. channel.basicConsume(QUEUE_NAME, false, queueingConsumer);
    33. //6.获取消息
    34. while (true) {
    35. QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
    36. String message = new String(delivery.getBody());
    37. System.out.println("[消费者1] received message : '" + message + "'");
    38. //休眠10毫秒
    39. Thread.sleep(1000);
    40. //返回确认状态
    41. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    42. }
    43. }
    44. }
  4. 分析结果

    生产者发送消息绑定的路由key为update.Name;消费者1监听的队列和交换器绑定路由key为update.#;消费者2监听的队列和交换器绑定路由key为select.#.

    很显然,消费者1会接收到消息,而消费者2接收不到

    image-20180902151433116

    image-20180902151448651

    image-20180902151458185

6.四种交换器

​ 前面介绍了五种队列模式,但是实际上只有三种,第一种简单队列,第二种工作模式,剩下的三种都是和交换器绑定的合起来称为一种,这节详细介绍交换器.

​ 交换器分为四种,分别是:direct,fanout,topic和headers.

​ 前三种分别对应路由模式,发布订阅模式和通配符模式,headers交换器允许匹配AMQP消息的header而非路由键,除此之外,header交换器和direct交换器完全一致,但是性能却差很多,因此基本上不会用到该交换器,这不做详细介绍.

  1. direct

    如果路由键完全匹配的话,消息才会被投放到相应的队列.

  2. fanout

    当发送一条消息到fanout交换器上时,它会把消息投放到所有附加在此交换器的上的队列.

  3. topic

    设置模糊的绑定方式,”*“操作符将”.”视为分隔符,匹配单个字符;”#“操作符没有分块的概念,它将任意”.”均视为关键字的匹配部分,能够匹配多个字符.

7.总结

​ 关于RabbitMQ的五种队列,其实实际使用最多的是最后一种主题模式,通过模糊匹配,使得操作更加自如.

发表评论

表情:
评论列表 (有 0 条评论,41人围观)

还没有评论,来说两句吧...

相关阅读