RabbitMQ应用场景与实例

- 日理万妓 2022-01-27 04:13 314阅读 0赞
  • Publisher:是Message的生产者,Publisher这个Clients产生了一些Message。
  • Consumer:Message的消费者,Publisher产生的Message,最终要到达Consumer这个Clients,进行消费。
  • Exchange: 指定消息按什么规则,路由到哪个Queue,Message消息先要到达Exchange,在Server中承担着从Produce接收Message的责任。
  • Queue:到达Exchange的消息,根据制定的规则(Routing key)到达对应的Queue,在Server中承担着装载Message,是Message的容器,等待被消费出去。
  • Routing key:在Exchange和Queue之间隐藏有一条黑线,可以将这条黑线看成是Routing key,Exchange就是根据这些定义好的Routing key将Message送到对应的Queue中去,是Exchange和Queue之间的桥梁。
  • Broker:之前一直不理解这个Broker,其实Broker就是接收和分发消息的应用,也就是说RabbitMQ Server就是Message Broker。
  • VirtualHost: 虚拟主机,一个Broker里可以开有多个VirtualHost,它的作用是用作不同用户的权限分离。
  • Connection:是Publisher/Consumer和Broker之间的TCP连接。断开连接的操作只会在Publisher/Consumer端进行,Broker不会断开连接,除非出现网络故障或者Broker服务出现问题,Broker服务宕了。
  • Connection: Channel:如果每一次访问RabbitMQ就建立一个Connection,那在消息量大的时候建立TCP Connection的开销就会很大,导致的后果就是效率低下。

左边的Client向右边的Client发送消息,流程:
第一:获取Conection
第二:获取Channel
第三:定义Exchange,Queue
第四:使用一个RoutingKey将Queue Binding到一个Exchange上
第五:通过指定一个Exchange和一个RoutingKey来将消息发送到对应的Queue上,
第六:Consumer在接收时也是获取connection,接着获取channel,然后指定一个Queue,到Queue上取消息,它对Exchange,RoutingKey及如何Binding都不关心,到对应的Queue上去取消息就行了。

应用场景

1、异步处理

场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种1.串行的方式;2.并行的方式
(1)串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西.

20190525102635259.png

  1. (2)并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0ppblhZYW4_size_16_color_FFFFFF_t_70

  1. 假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并性已经提高的处理时间,但是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,英爱是写入数据库后就返回.
  2. (3)消息队列 :引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理

20190525102844278.png

  1. 由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍。

2 应用解耦

  1. 场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.

20190525102959709.png

这种做法有一个缺点:

  • 当库存系统出现故障时,订单就会失败。(这样马云将少赚好多好多钱^ ^)
  • 订单系统和库存系统高耦合.
    引入消息队列

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0ppblhZYW4_size_16_color_FFFFFF_t_70 1

  • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
  • 库存系统:订阅下单的消息,获取下单消息,进行库操作。就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失(马云这下高兴了).

3 流量削峰

流量削峰一般在秒杀活动中应用广泛
场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
作用:

  • 1.可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^)
  • 2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)、、、

20190525103243530.png

1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.
2.秒杀业务根据消息队列中的请求信息,再做后续处理.

RabbitMQ实例

1、原生API

添加依赖:

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>4.1.0</version>
  5. </dependency>

生产者代码:

  1. public class Producer {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //创建连接工厂
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setUsername("guest");
  6. factory.setPassword("guest");
  7. //设置 RabbitMQ 地址
  8. factory.setHost("localhost");
  9. //建立到代理服务器到连接(TCP连接)
  10. Connection conn = factory.newConnection();
  11. //获得信道(信道复用TCP连接)
  12. Channel channel = conn.createChannel();
  13. //声明交换器
  14. String exchangeName = "hello-exchange";
  15. channel.exchangeDeclare(exchangeName, "direct", true); // "direct"将消息保存到队列的方式
  16. String routingKey = "hola";
  17. //发布消息
  18. byte[] messageBodyBytes = "quit".getBytes();
  19. channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
  20. channel.close();
  21. conn.close();
  22. }
  23. }

消费者代码:

  1. public class Consumer {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. ConnectionFactory factory = new ConnectionFactory();
  4. factory.setUsername("guest");
  5. factory.setPassword("guest");
  6. factory.setHost("localhost");
  7. //建立到代理服务器到连接
  8. Connection conn = factory.newConnection();
  9. //获得信道
  10. final Channel channel = conn.createChannel();
  11. //声明交换器
  12. String exchangeName = "hello-exchange";
  13. channel.exchangeDeclare(exchangeName, "direct", true);
  14. //声明队列
  15. String queueName = channel.queueDeclare().getQueue();
  16. String bindingKey = "hola";
  17. //绑定队列,通过键 hola 将队列和交换器绑定起来
  18. channel.queueBind(queueName, exchangeName, bindingKey);
  19. while(true) {
  20. //消费消息
  21. boolean autoAck = false;
  22. String consumerTag = "";
  23. channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
  24. @Override
  25. public void handleDelivery(String consumerTag,
  26. Envelope envelope,
  27. AMQP.BasicProperties properties,
  28. byte[] body) throws IOException {
  29. String routingKey = envelope.getRoutingKey();
  30. String contentType = properties.getContentType();
  31. System.out.println("消费的路由键:" + routingKey);
  32. System.out.println("消费的内容类型:" + contentType);
  33. long deliveryTag = envelope.getDeliveryTag();
  34. //确认消息
  35. channel.basicAck(deliveryTag, false); // 手动确认消息,防止消息丢失
  36. System.out.println("消费的消息体内容:");
  37. String bodyStr = new String(body, "UTF-8");
  38. System.out.println(bodyStr);
  39. }
  40. });
  41. }
  42. }
  43. }

2、RabbitMQ与Spring整合:

application_context_rabbit.xml:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:context="http://www.springframework.org/schema/context"
  5. xmlns:util="http://www.springframework.org/schema/util"
  6. xmlns:aop="http://www.springframework.org/schema/aop"
  7. xmlns:tx="http://www.springframework.org/schema/tx"
  8. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  9. xmlns:p="http://www.springframework.org/schema/p"
  10. xsi:schemaLocation="
  11. http://www.springframework.org/schema/context
  12. http://www.springframework.org/schema/context/spring-context-3.0.xsd
  13. http://www.springframework.org/schema/beans
  14. http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
  15. http://www.springframework.org/schema/util
  16. http://www.springframework.org/schema/util/spring-util-3.0.xsd
  17. http://www.springframework.org/schema/aop
  18. http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
  19. http://www.springframework.org/schema/tx
  20. http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
  21. http://www.springframework.org/schema/rabbit
  22. http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
  23. <context:property-placeholder location="classpath:application.properties"/>
  24. <!-- RabbitMQ start -->
  25. <!-- 连接配置 -->
  26. <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}"
  27. password="${mq.password}" port="${mq.port}" />
  28. <rabbit:admin connection-factory="connectionFactory"/>
  29. <!-- queue 队列声明 -->
  30. <!--
  31. durable 是否持久化
  32. exclusive 仅创建者可以使用的私有队列,断开后自动删除
  33. auto-delete 当所有消费端连接断开后,是否自动删除队列 -->
  34. <rabbit:queue id="orderinfomation_queue" name="${mq.queue}_orderinfomation" durable="true" auto-delete="false" exclusive="false" />
  35. <rabbit:queue id="payinfomation_queue" name="${mq.queue}_payinfomation" durable="true" auto-delete="false" exclusive="false" />
  36. <!-- 交换机定义 -->
  37. <!--
  38. 交换机:一个交换机可以绑定多个队列,一个队列也可以绑定到多个交换机上。
  39. 如果没有队列绑定到交换机上,则发送到该交换机上的信息则会丢失。
  40. direct模式:消息与一个特定的路由器完全匹配,才会转发
  41. topic模式:按模式匹配
  42. -->
  43. <rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false">
  44. <rabbit:bindings>
  45. <!-- 设置消息Queue匹配的pattern (direct模式为key) -->
  46. <rabbit:binding queue="orderinfomation_queue" pattern="orderinfomation"/>
  47. <rabbit:binding queue="payinfomation_queue" pattern="payinfomation"/>
  48. </rabbit:bindings>
  49. </rabbit:topic-exchange>
  50. <bean name="orderinfoService" class="com.youfan.service.impl.OrderinfoService"></bean>
  51. <bean name="payinfoService" class="com.youfan.service.impl..PayinfoService"></bean>
  52. <!-- 配置监听 消费者 -->
  53. <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
  54. <!--
  55. queues 监听队列,多个用逗号分隔
  56. ref 监听器 -->
  57. <rabbit:listener queues="orderinfomation_queue" ref="orderinfoService"/>
  58. <rabbit:listener queues="payinfomation_queue" ref="payinfoService"/>
  59. </rabbit:listener-container>
  60. </beans>

application.properties:

  1. mq.host=127.0.0.1
  2. mq.username=guest
  3. mq.password=guest
  4. mq.port=5672
  5. mq.queue=ms

监听的消费者:

  1. public class PayinfoService implements MessageListener {
  2. public void onMessage(Message message) {
  3. System.out.println("消息消费者 = " + message.toString());
  4. }
  5. }
  6. public class OrderinfoService implements MessageListener {
  7. public void onMessage(Message message) {
  8. System.out.println("消息消费者 = " + message.toString());
  9. }
  10. }

3、Rabbit与SpringBoot整合

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

application.yml中添加RabbitMQ地址:

  1. spring:
  2. rabbitmq:
  3. host: 192.168.56.101
  4. username: leyou
  5. password: leyou
  6. virtual-host: /leyou

监听者:

  1. SpringAmqp中,对消息的消费者进行了封装和抽象,一个普通的JavaBean中的普通方法,只要通过简单的注解,就可以成为一个消费者。
  2. @Component
  3. public class Listener {
  4. @RabbitListener(bindings = @QueueBinding(
  5. value = @Queue(value = "spring.test.queue", durable = "true"),
  6. exchange = @Exchange(
  7. value = "spring.test.exchange",
  8. ignoreDeclarationExceptions = "true",
  9. type = ExchangeTypes.TOPIC
  10. ),
  11. key = {"#.#"}))
  12. public void listen(String msg){
  13. System.out.println("接收到消息:" + msg);
  14. }
  15. }
  • @Componet:类上的注解,注册到Spring容器
  • @RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:

    • bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性:

      • value:这个消费者关联的队列。值是@Queue,代表一个队列
      • exchange:队列所绑定的交换机,值是@Exchange类型
      • key:队列和交换机绑定的RoutingKey

类似listen这样的方法在一个类中可以写多个,就代表多个消费者。

AmqpTemplate

  1. Spring最擅长的事情就是封装,把他人的框架进行封装和整合。SpringAMQP提供了统一的消息处理模板:AmqpTemplate,非常方便的发送消息,其发送方法:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0ppblhZYW4_size_16_color_FFFFFF_t_70 2

红框圈起来的是比较常用的3个方法,分别是:

  • 指定交换机、RoutingKey和消息体
  • 指定消息
  • 指定RoutingKey和消息,会向默认的交换机发送消息

测试代码

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest(classes = Application.class)
  3. public class MqDemo {
  4. @Autowired
  5. private AmqpTemplate amqpTemplate;
  6. @Test
  7. public void testSend() throws InterruptedException {
  8. String msg = "hello, Spring boot amqp";
  9. this.amqpTemplate.convertAndSend("spring.test.exchange","a.b", msg);
  10. // 等待10秒后再结束
  11. Thread.sleep(10000);
  12. }
  13. }

发表评论

表情:
评论列表 (有 0 条评论,314人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RabbitMQ 应用场景

    一、消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有

    相关 rabbitmq应用场景

    1.异步处理 场景说明:用户注册后,需要发注册邮件和注册短信, 传统的做法有两种 1.串行的方式,注册信息入库,发送邮件,发送短信,返回注册成功提示。 2.并行的方式,注册