SpringBoot整合ActiveMQ

淩亂°似流年 2023-10-11 20:17 13阅读 0赞

目录

一、队列

(一)生产者

(二)消费者

二、发布订阅

(一)Topic生产者

(二)Topic消费者


一、队列

(一)生产者

1.新建Maven工程并设置包名类名

b8296dba24964de7b5b251c1c63c19cb.png

b6cdfc18fc624f958a494b4e04d6304a.png

2.Pom.xml

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-test</artifactId>
  9. <scope>test</scope>
  10. </dependency>
  11. <!--spring boot整合activemq的jar包-->
  12. <dependency>
  13. <groupId>org.springframework.boot</groupId>
  14. <artifactId>spring-boot-starter-activemq</artifactId>
  15. <version>2.1.5.RELEASE</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>junit</groupId>
  19. <artifactId>junit</artifactId>
  20. <scope>test</scope>
  21. </dependency>
  22. </dependencies>

3.application.yml

  1. server:
  2. port: 7777 #端口号
  3. spring:
  4. activemq:
  5. broker-url: tcp://193.179.123.10:61616
  6. user: admin
  7. password: admin
  8. jms:
  9. pub-sub-domain: false #false=queue true=topic
  10. #自己定义队列名称
  11. myqueue: boot-activemq-queue

4.configBean

  1. // 让spring管理的注解,相当于spring中在xml 中写了个bean
  2. @Component
  3. // 开启jms适配
  4. @EnableJms
  5. public class ConfigBean {
  6. // 注入配置文件中的 myqueue
  7. @Value("${myqueue}")
  8. private String myQueue ;
  9. @Bean // bean id="" class="…"
  10. public ActiveMQQueue queue(){
  11. return new ActiveMQQueue(myQueue);
  12. }
  13. }

5.Queue_produce

  1. @Component
  2. public class Queue_produce {
  3. @Autowired
  4. private JmsMessagingTemplate jmsMessagingTemplate;
  5. @Autowired
  6. private Queue queue;
  7. public void produceMsg() {
  8. jmsMessagingTemplate.convertAndSend(String.valueOf(queue),"******:"+ UUID.randomUUID().toString().substring(0,6));
  9. }
  10. }

5.BootMqProduceApplication

  1. @SpringBootApplication
  2. public class BootMqProduceApplication {
  3. public static void main(String[] args) {
  4. SpringApplication.run(BootMqProduceApplication.class, args);
  5. }
  6. }

6.测试类

  1. @SpringBootTest(classes = BootMqProduceApplication.class)
  2. @RunWith(SpringJUnit4ClassRunner.class)
  3. @WebAppConfiguration
  4. public class TestActiveMQ {
  5. @Autowired
  6. private Queue_produce produce;
  7. @Test
  8. public void testsend() throws Exception{
  9. produce.produceMsg();
  10. }
  11. }

测试结果

286576325af4464a9fda1edac5279ae1.png

7.每隔3秒钟往MQ投递消息

(1)修改Queue_produce

  1. @Component
  2. public class Queue_produce {
  3. @Autowired
  4. private JmsMessagingTemplate jmsMessagingTemplate;
  5. @Autowired
  6. private Queue queue;
  7. // public void produceMsg() {
  8. // jmsMessagingTemplate.convertAndSend(String.valueOf(queue),"******:"+ UUID.randomUUID().toString().substring(0,6));
  9. // }
  10. @Scheduled(fixedDelay = 3000L)
  11. public void produceMsgScheduled() {
  12. jmsMessagingTemplate.convertAndSend(String.valueOf(queue),"******:"+ UUID.randomUUID().toString().substring(0,6));
  13. }
  14. }

(2)主启动类增加注解

490b26c7785f45cb8dcd38ee1fd2053d.png

(3) 直接开启主启动类,间隔3秒发消息

054f79ca241d4a938c255da4e691f5c3.png

3a1bc46a083f452cadb313ee1c11b664.png 注意:这里队列的名字变化为queue://boot-activemq-queue

(二)消费者

1.新建Maven工程并设置包名类名

dd4cc40430d84866aacc087b524740db.png

2.Pom.xml文件与前面一致

3.application.yml

  1. server:
  2. port: 8888 #端口号与前面不同,每个微服务有不同的微服务端口号
  3. spring:
  4. activemq:
  5. broker-url: tcp://193.179.123.10:61616
  6. user: admin
  7. password: admin
  8. jms:
  9. pub-sub-domain: false #false=queue true=topic
  10. #监听队列的名称
  11. myqueue: queue://boot-activemq-queue

4.Queue_Consumer

  1. @Component
  2. public class Queue_consumer {
  3. //读取yml文件中的myqueue
  4. @JmsListener(destination = "${myqueue}")
  5. public void receive(TextMessage textMessage) throws JMSException {
  6. System.out.println("***收到消息: "+textMessage.getText());
  7. }
  8. }

5.运行主启动类

e309d6c7f297464b92b7c6b290d3583a.png

二、发布订阅

(一)Topic生产者

1.新建Maven工程并设置包名类名

167b6269ea7b432885f07aed3eb546a3.png

2.Pom.xml文件与前面一致

3.application.properties

  1. server.port=6666
  2. spring.activemq.broker-url=tcp://193.179.123.10:61616
  3. spring.activemq.user=admin
  4. spring.activemq.password=admin
  5. spring.jms.pub-sub-domain=true
  6. mytopic=boot-activemq-topic

4.ConfigBean

  1. @Component
  2. @EnableJms
  3. public class ConfigBean {
  4. @Value("${mytopic}")
  5. public String mytopicName;
  6. @Bean
  7. public ActiveMQTopic topic() {
  8. return new ActiveMQTopic(mytopicName);
  9. }
  10. }

5.Topic_Produce

  1. @Component
  2. public class Topic_Produce {
  3. @Autowired
  4. private JmsTemplate jmsTemplate;
  5. @Autowired
  6. private Topic topic;
  7. @Scheduled(fixedDelay = 3000)
  8. public void produceScheduled() {
  9. jmsTemplate.convertAndSend(topic, "主题消息:"+UUID.randomUUID().toString().substring(0,6));
  10. }
  11. }

6.启动主启动类,每隔3秒发送一条主题

3dc541a0ffea4ab39c69c22135b88d5a.png

(二)Topic消费者

1.新建Maven工程并设置包名类名

7142143939174599b3c1324856dcd30e.png

2.Pom.xml同上

3.application.properties

  1. #等下改为5566模拟两个用户进行监听
  2. server.port=5566
  3. spring.activemq.broker-url=tcp://193.179.123.10:61616
  4. spring.activemq.user=admin
  5. spring.activemq.password=admin
  6. spring.jms.pub-sub-domain=true
  7. mytopic=boot-activemq-topic

4.Topic_Consumer

  1. @Component
  2. public class Topic_Consumer {
  3. @JmsListener(destination = "boot-activemq-topic")
  4. public void receive(TextMessage textMessage) throws JmsException, JMSException {
  5. System.out.println("监听到主题:"+textMessage.getText());
  6. }
  7. }

5.模拟两个订阅者主启动类,先启动订阅者再启动消费者

5566订阅者

4cb853eb703342ec834dac17a127f3c5.png

5555订阅者

41a583d7505c4f06ae2bd8a621e3aa01.png

两个消费者

c98bc1baaec1441e994028a0fa743731.png

发表评论

表情:
评论列表 (有 0 条评论,13人围观)

还没有评论,来说两句吧...

相关阅读

    相关 SpringBoot整合ActiveMQ

    前言   ActiveMQ 一个成熟的消息中间件,作用于系统之间的通信,降低模块与模块之间的耦合度。   消息的传递有两种类型: 1. Queue 队列模式:一个生