Spring Boot——RabbitMQ

谁借莪1个温暖的怀抱¢ 2022-05-16 10:09 262阅读 0赞

转载自https://blog.csdn.net/lyhkmm/article/details/78772919

RabbitMq的介绍

RabbitMq的基本原理可以自行上网查阅,或者点击传送门:RabbitMQ的基本原理。

使用配置

1、老规矩,先在pom.xml中添加相关依赖:

  1. <!--消息队列模块-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

2、在application.properties添加rabbitmq的相关信息:

  1. spring.application.name=spirng-boot-rabbitmq
  2. spring.rabbitmq.host=127.0.0.1
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=guest
  5. spring.rabbitmq.password=guest

端口、用户名和密码都是默认的,根据自己的实际情况配置,rabbitmq的安装教程网上很多了,这里暂时不介绍,以后有时间补上。

3、配置队列:

  1. package com.lyh.demo;
  2. import org.springframework.amqp.core.Queue;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /**
  6. * @Author:linyuanhuang
  7. * @Description:队列配置,队列的名称,发送者和接受者的名称必须一致,否则接收不到消息
  8. * @Date:2017/12/11 14:50
  9. */
  10. @Configuration
  11. public class RabbitMqConfig {
  12. @Bean
  13. public Queue Queue1() {
  14. return new Queue("lyhTest1");
  15. }
  16. }

4、发送者通过Controller类发送消息:

  1. package com.lyh.demo.controller;
  2. import org.springframework.amqp.core.AmqpTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.RequestMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. import java.util.Date;
  7. @RestController
  8. public class SendController {
  9. @Autowired
  10. private AmqpTemplate amqpTemplate;
  11. @RequestMapping("/send")
  12. public String send(){
  13. String content="Date:"+new Date();
  14. amqpTemplate.convertAndSend("lyhTest1",content);
  15. return content;
  16. }
  17. }

5、创建接受者Receiver1,新建类:

  1. package com.lyh.demo.Receiver;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. @RabbitListener(queues = "lyhTest1")
  7. public class Receiver1 {
  8. @RabbitHandler
  9. public void receiver(String msg){
  10. System.out.println("Test1 receiver1:"+msg);
  11. }
  12. }

6、测试

浏览器访问地址:http://localhost:8080/send,如下图:

Center

终端输出接受的内容:

20171211153744402

查看RabbitMQ的Web客户端http://localhost:15672,需要自己安装RabbitMQ的客户端,可以自己上网查阅相关教程。帐号密码和配置文件一样,如下图:可以在列表里看到之前创建的队列。

20171211155304203

一对多的使用配置

1、一对多,一个发送者发送消息,多个接受者接受同一个消息,添加新的接收者Receiver2:

  1. package com.lyh.demo.Receiver;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. @RabbitListener(queues = "lyhTest1")
  7. public class Receiver2 {
  8. @RabbitHandler
  9. public void receiver(String msg){
  10. System.out.println("Test1 receiver2:"+msg);
  11. }
  12. }

2、发送者循环发送10个消息,在SendController添加一对多发送方法:

  1. @RequestMapping("/multiSend")
  2. public String multiSend(){
  3. StringBuilder times=new StringBuilder();
  4. for(int i=0;i<10;i++){
  5. long time=System.nanoTime();
  6. amqpTemplate.convertAndSend("lyhTest1","第"+i+"次发送的时间:"+time);
  7. times.append(time+"<br>");
  8. }
  9. return times.toString();
  10. }

3、测试,浏览器访问http://localhost:8080/multiSend,如下图:

Center 1

4、终端输出接收数据:

  1. Test1 receiver2:第1次发送的时间:25953655163399
  2. Test1 receiver1:第0次发送的时间:25953641137213
  3. Test1 receiver2:第2次发送的时间:25953655403734
  4. Test1 receiver1:第3次发送的时间:25953655591967
  5. Test1 receiver1:第5次发送的时间:25953655949458
  6. Test1 receiver2:第4次发送的时间:25953655772971
  7. Test1 receiver1:第6次发送的时间:25953656111790
  8. Test1 receiver1:第8次发送的时间:25953656492471
  9. Test1 receiver1:第9次发送的时间:25953656687330
  10. Test1 receiver2:第7次发送的时间:25953656277133

可以看到发送者发送一个消息被多个接收者接收,注意这里的消息只能被消费一次

多对多的使用配置

1、在配置类RabbbitMqConfig添加新的队列名lyhTest2:

  1. @Configuration
  2. public class RabbitMqConfig {
  3. @Bean
  4. public Queue Queue1() {
  5. return new Queue("lyhTest1");
  6. }
  7. @Bean
  8. public Queue Queue2() {
  9. return new Queue("lyhTest2");
  10. }
  11. }

2、修改Receiver2接收队列名为lyhTest2:

  1. @Component
  2. @RabbitListener(queues = "lyhTest2")
  3. //这里的lyhTest2是多对多,如果要测试一对多改成lyhTest1
  4. public class Receiver2 {
  5. @RabbitHandler
  6. public void receiver(String msg){
  7. System.out.println("Test2 receiver2:"+msg);
  8. }
  9. }

3、在SendController添加多对多发送消息的方法:

  1. @RequestMapping("/multi2MultiSend")
  2. public String mutil2MutilSend(){
  3. StringBuilder times=new StringBuilder();
  4. for(int i=0;i<10;i++){
  5. long time=System.nanoTime();
  6. amqpTemplate.convertAndSend("lyhTest1","第"+i+"次发送的时间:"+time);
  7. amqpTemplate.convertAndSend("lyhTest2","第"+i+"次发送的时间:"+time);
  8. times.append(time+"<br>");
  9. }
  10. return times.toString();
  11. }

4、测试,浏览器访问:http://localhost:8080/multi2MultiSend,如下图:

Center 2

5、终端输出接收数据:

  1. Test1 receiver1:第0次发送的时间:27607875773748
  2. Test2 receiver2:第0次发送的时间:27607875773748
  3. Test2 receiver2:第1次发送的时间:27607882272138
  4. Test2 receiver2:第2次发送的时间:27607882429049
  5. Test1 receiver1:第1次发送的时间:27607882272138
  6. Test2 receiver2:第3次发送的时间:27607882594693
  7. Test1 receiver1:第2次发送的时间:27607882429049
  8. Test2 receiver2:第4次发送的时间:27607882897371
  9. Test1 receiver1:第3次发送的时间:27607882594693
  10. Test2 receiver2:第5次发送的时间:27607883163005
  11. Test1 receiver1:第4次发送的时间:27607882897371
  12. Test2 receiver2:第6次发送的时间:27607883319916
  13. Test2 receiver2:第7次发送的时间:27607883489777
  14. Test1 receiver1:第5次发送的时间:27607883163005
  15. Test1 receiver1:第6次发送的时间:27607883319916
  16. Test2 receiver2:第8次发送的时间:27607883957798
  17. Test2 receiver2:第9次发送的时间:27607884305953
  18. Test1 receiver1:第7次发送的时间:27607883489777
  19. Test1 receiver1:第8次发送的时间:27607883957798
  20. Test1 receiver1:第9次发送的时间:27607884305953
  21. 可以看到不同的接收者接收不同发送者发送的消息,消息也可以是实体对象,这里就不做演示。

Topic Exchange的使用配置

Topic Exchange是RabbitMQ中最灵活的一种方式,它能够根据routing_key自由的绑定不同的队列,可以适用绝大部分的项目需求

1、新建RabbitMqTopicConfig配置类:

  1. package com.lyh.demo;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.amqp.core.TopicExchange;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. /**
  9. * @Author:linyuanhuang
  10. * @Description:Topic Exchange配置类
  11. * @Date:2017/12/11 17:13
  12. */
  13. @Configuration
  14. public class {
  15. //只接一个topic
  16. final static String message = "topic.message";
  17. //接收多个topic
  18. final static String messages = "topic.messages";
  19. @Bean
  20. public Queue queueMessage() {
  21. return new Queue(RabbitMqTopicConfig.message);
  22. }
  23. @Bean
  24. public Queue queueMessages() {
  25. return new Queue(RabbitMqTopicConfig.messages);
  26. }
  27. @Bean
  28. TopicExchange exchange() {
  29. return new TopicExchange("exchange");
  30. }
  31. @Bean
  32. Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
  33. return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
  34. }
  35. @Bean
  36. Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
  37. //这里的#表示零个或多个词。
  38. return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
  39. }

2、在SendController添加发送消息方法:

  1. @RequestMapping("/topicSend1")
  2. public String topicSend1() {
  3. String context = "my topic 1";
  4. System.out.println("发送者说 : " + context);
  5. this.amqpTemplate.convertAndSend("exchange", "topic.message", context);
  6. return context;
  7. }
  8. @RequestMapping("/topicSend2")
  9. public String topicSend2() {
  10. String context = "my topic 2";
  11. System.out.println("发送者说 : " + context);
  12. this.amqpTemplate.convertAndSend("exchange", "topic.messages", context);
  13. return context;

3、创建接收者的方法TopicReceiver1和TopicReceiver2:

TopicReceiver1:

  1. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. @RabbitListener(queues = "topic.message")
  6. public class TopicReceiver1 {
  7. @RabbitHandler
  8. public void process(String msg) {
  9. System.out.println("TopicReceiver1:" + msg);
  10. }
  11. }
  12. TopicReceiver2
  13. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  14. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  15. import org.springframework.stereotype.Component;
  16. @Component
  17. @RabbitListener(queues = "topic.messages")
  18. public class TopicReceiver2 {
  19. @RabbitHandler
  20. public void process(String msg) {
  21. System.out.println("TopicReceiver2 :" + msg);
  22. }

4、测试:

浏览器访问http://localhost:8080/topicSend1,终端输出:

  1. 发送者说 : my topic 1
  2. TopicReceiver1:my topic 1
  3. TopicReceiver2 :my topic 1

浏览器访问http://localhost:8080/topicSend2,终端输出:

  1. 发送者说 : my topic 2
  2. TopicReceiver2 :my topic 2
  3. 5、总结:

这里的Topic Exchange 转发消息主要是根据通配符,队列topic.message只能匹配topic.message的路由。而topic.messages匹配路由规则是topic.#,所以它可以匹配topic.开头的全部路由。而topic.#发送的消息也只能是topic.#的接受者才能接收。

GitHub地址:https://github.com/lyhkmm/spring-boot-examples/tree/master/spring-boot-rabbitmq

发表评论

表情:
评论列表 (有 0 条评论,262人围观)

还没有评论,来说两句吧...

相关阅读

    相关 SpringSpring 事务

    一、事务 1.1 概念 > 事务是指数据库操作的最小工作单元,是作为单个逻辑工作单元执行的一系列操作,这些操作作为一个整体一起向系统提交,要么都执行,要么都不执行。