【ActiveMQ】之 SpringBoot 与 ActiveMQ 整合

╰+哭是因爲堅強的太久メ 2023-02-27 15:51 107阅读 0赞

首先,我们创建 SpringBoot 的 Maven 工程,然后配置 pom.xml 文件:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4. <groupId>com.wang</groupId>
  5. <artifactId>boot_mq_queue_producer</artifactId>
  6. <version>1.0-SNAPSHOT</version>
  7. <!-- 配置 spring boot 的父类 -->
  8. <parent>
  9. <groupId>org.springframework.boot</groupId>
  10. <artifactId>spring-boot-starter-parent</artifactId>
  11. <version>2.1.15.RELEASE</version>
  12. <relativePath/> <!-- lookup parent from repository -->
  13. </parent>
  14. <properties>
  15. <!-- 配置编译器 -->
  16. <project.version>1.8</project.version>
  17. </properties>
  18. <!-- 配置 spring boot 依赖包 -->
  19. <dependencies>
  20. <!-- spring boot 的基本三大件 -->
  21. <dependency>
  22. <groupId>org.springframework.boot</groupId>
  23. <artifactId>spring-boot-starter</artifactId>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.springframework.boot</groupId>
  27. <artifactId>spring-boot-starter-web</artifactId>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.springframework.boot</groupId>
  31. <artifactId>spring-boot-starter-test</artifactId>
  32. </dependency>
  33. <!-- spring 和 activemq 的整合包 -->
  34. <dependency>
  35. <groupId>org.springframework.boot</groupId>
  36. <artifactId>spring-boot-starter-activemq</artifactId>
  37. </dependency>
  38. </dependencies>
  39. <build>
  40. <plugins>
  41. <plugin>
  42. <!-- 构建 Spring boot 的插件 -->
  43. <groupId>org.springframework.boot</groupId>
  44. <artifactId>spring-boot-maven-plugin</artifactId>
  45. </plugin>
  46. </plugins>
  47. </build>
  48. </project>

下面以队列为例,在 resources 下添加我们的 application.yml 文件:

  1. server:
  2. port: 7777
  3. spring:
  4. activemq:
  5. broker-url: tcp://127.0.0.1:61616 # mq 服务器地址
  6. user: admin
  7. password: admin
  8. jms:
  9. pub-sub-domain: false # 是否为主题,false 为 queue,true 为 topic
  10. # 自定义队列名称
  11. myqueue: boot-activemq-queue

接下来添加 Queue 的 Bean 配置类:

  1. @Component
  2. @EnableJms // 开启适配注解
  3. public class ConfigBean {
  4. @Value("${myqueue}")
  5. private String myQueue;
  6. @Bean
  7. public Queue queue() {
  8. return new ActiveMQQueue(myQueue);
  9. }
  10. }

然后就可以在 Producer 中引用 queue 来发送消息了:

  1. @Component
  2. public class QueueProducer {
  3. @Autowired
  4. private JmsMessagingTemplate jmsMessagingTemplate;
  5. @Autowired
  6. private Queue queue;
  7. // 手动发送消息
  8. public void produceMsg() {
  9. String msg = UUID.randomUUID().toString().substring(0, 6);
  10. jmsMessagingTemplate.convertAndSend(queue, "produceMsg : " + msg);
  11. System.out.println("------- produceMsg 发送消息完毕 ------");
  12. }
  13. // 每隔3秒发送一条消息
  14. @Scheduled(fixedDelay = 3000)
  15. public void scheduledProduceMsg() {
  16. String msg = UUID.randomUUID().toString().substring(0, 6);
  17. jmsMessagingTemplate.convertAndSend(queue, "produceMsg : " + msg);
  18. System.out.println("------- scheduledProduceMsg 发送消息完毕 ------");
  19. }
  20. }

为了使间隔发送消息生效,我们需要在 SpringBoot 的启动程序不中添加 @EnableScheduling 注解:

  1. @SpringBootApplication
  2. @EnableScheduling // 开启 scheduling
  3. public class ProducerApplication {
  4. public static void main(String[] args) {
  5. SpringApplication.run(ProducerApplication.class, args);
  6. }
  7. }

下面我们来创建 Spring Boot 的 Consumer 功能,创建方法和 Producer 基本一致,只需要修改 yaml 文件的微服务端口即可:

  1. server:
  2. port: 8888
  3. spring:
  4. activemq:
  5. broker-url: tcp://127.0.0.1:61616 # mq 服务器地址
  6. user: admin
  7. password: admin
  8. jms:
  9. pub-sub-domain: false # 是否为主题,false 为 queue,true 为 topic
  10. # 自定义队列名称
  11. myqueue: boot-activemq-queue

接下来就可以编写我们的消费者代码了,消费者无需像生产者那样配置 Queue 的 Bean 配置文件,只需要监听消息即可:

  1. @Component
  2. public class Consumer {
  3. // 监听 myqueue 队列的消息
  4. @JmsListener(destination = "${myqueue}")
  5. public void receive(TextMessage textMessage) throws JMSException {
  6. System.out.println("接收消息:" + textMessage.getText());
  7. }
  8. }

这样我们只需要启动 consumer 对应的 SpringBootApplication 程序即可以监听接收来自队列的消息:

  1. @SpringBootApplication
  2. public class ConsumerApplication {
  3. public static void main(String[] args) {
  4. SpringApplication.run(ConsumerApplication.class);
  5. }
  6. }

主题 Topic 方式

接下来我们演示主题的 springboot 工程,同样我们先创建 Producer 工程,修改 yaml 文件为:

  1. server:
  2. port: 6666
  3. spring:
  4. activemq:
  5. broker-url: tcp://127.0.0.1:61616 # mq 服务器地址
  6. user: admin
  7. password: admin
  8. jms:
  9. pub-sub-domain: true # 设置为主题
  10. # 自定义主题名称
  11. myTopic: boot-activemq-topic

唯一不同的是,我们把 pub-sub-domain 设置为了 true,表示为主题方式,同时修改主题名称。

对于 Producer 我们需要配置对应的 Topic 的 Bean 文件:

  1. @Component
  2. public class ConfigBean {
  3. // Topic 名称
  4. @Value("${myTopic}")
  5. private String myTopic;
  6. @Bean
  7. public Topic topic() {
  8. return new ActiveMQTopic(myTopic);
  9. }
  10. }

然后就是我们的 topic 的 producer 代码:

  1. @Component
  2. @EnableJms // 开启适配注解
  3. public class TopicProducer {
  4. @Autowired
  5. private JmsMessagingTemplate jmsMessagingTemplate;
  6. @Autowired
  7. private Topic topic;
  8. // 定时发送消息
  9. @Scheduled(fixedDelay = 3000)
  10. public void produceTopicMsg() {
  11. String msg = UUID.randomUUID().toString().substring(0, 6);
  12. jmsMessagingTemplate.convertAndSend(topic, "Topic Msg : " + msg);
  13. System.out.println("----- send topic msg : " + msg);
  14. }
  15. }

配置了定时发送之后必要忘了在 SpringBoot 的启动程序加上 @EnableScheduling 注解。

接下来就是我们的 Topic 的 Consumer 项目工程了,工程创建流程还是一样,只需修改我们 Consumer 的 yaml 文件的端口即可:

  1. server:
  2. port: 5555
  3. spring:
  4. activemq:
  5. broker-url: tcp://127.0.0.1:61616 # mq 服务器地址
  6. user: admin
  7. password: admin
  8. jms:
  9. pub-sub-domain: true # 设置为主题
  10. # 自定义主题名称
  11. myTopic: boot-activemq-topic

然后就是编写我们的 Consumer 代码:

  1. @Component
  2. public class TopicConsumer {
  3. // 监听来自 Topic 的消息
  4. @JmsListener(destination = "${myTopic}")
  5. public void receive(TextMessage textMessage) throws JMSException {
  6. System.out.println("接收 topic message :" + textMessage.getText());
  7. }
  8. }

至此,全部配置完毕,对比 Queue,Topic 唯一不同的是需要先启动 Consumer 然后才启动 Producer。

发表评论

表情:
评论列表 (有 0 条评论,107人围观)

还没有评论,来说两句吧...

相关阅读