【ActiveMQ】之 SpringBoot 与 ActiveMQ 整合
首先,我们创建 SpringBoot 的 Maven 工程,然后配置 pom.xml 文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.wang</groupId>
<artifactId>boot_mq_queue_producer</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- 配置 spring boot 的父类 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.15.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<!-- 配置编译器 -->
<project.version>1.8</project.version>
</properties>
<!-- 配置 spring boot 依赖包 -->
<dependencies>
<!-- spring boot 的基本三大件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!-- spring 和 activemq 的整合包 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<!-- 构建 Spring boot 的插件 -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
下面以队列为例,在 resources 下添加我们的 application.yml 文件:
server:
port: 7777
spring:
activemq:
broker-url: tcp://127.0.0.1:61616 # mq 服务器地址
user: admin
password: admin
jms:
pub-sub-domain: false # 是否为主题,false 为 queue,true 为 topic
# 自定义队列名称
myqueue: boot-activemq-queue
接下来添加 Queue 的 Bean 配置类:
@Component
@EnableJms // 开启适配注解
public class ConfigBean {
@Value("${myqueue}")
private String myQueue;
@Bean
public Queue queue() {
return new ActiveMQQueue(myQueue);
}
}
然后就可以在 Producer 中引用 queue 来发送消息了:
@Component
public class QueueProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
// 手动发送消息
public void produceMsg() {
String msg = UUID.randomUUID().toString().substring(0, 6);
jmsMessagingTemplate.convertAndSend(queue, "produceMsg : " + msg);
System.out.println("------- produceMsg 发送消息完毕 ------");
}
// 每隔3秒发送一条消息
@Scheduled(fixedDelay = 3000)
public void scheduledProduceMsg() {
String msg = UUID.randomUUID().toString().substring(0, 6);
jmsMessagingTemplate.convertAndSend(queue, "produceMsg : " + msg);
System.out.println("------- scheduledProduceMsg 发送消息完毕 ------");
}
}
为了使间隔发送消息生效,我们需要在 SpringBoot 的启动程序不中添加 @EnableScheduling 注解:
@SpringBootApplication
@EnableScheduling // 开启 scheduling
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
下面我们来创建 Spring Boot 的 Consumer 功能,创建方法和 Producer 基本一致,只需要修改 yaml 文件的微服务端口即可:
server:
port: 8888
spring:
activemq:
broker-url: tcp://127.0.0.1:61616 # mq 服务器地址
user: admin
password: admin
jms:
pub-sub-domain: false # 是否为主题,false 为 queue,true 为 topic
# 自定义队列名称
myqueue: boot-activemq-queue
接下来就可以编写我们的消费者代码了,消费者无需像生产者那样配置 Queue 的 Bean 配置文件,只需要监听消息即可:
@Component
public class Consumer {
// 监听 myqueue 队列的消息
@JmsListener(destination = "${myqueue}")
public void receive(TextMessage textMessage) throws JMSException {
System.out.println("接收消息:" + textMessage.getText());
}
}
这样我们只需要启动 consumer 对应的 SpringBootApplication 程序即可以监听接收来自队列的消息:
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class);
}
}
主题 Topic 方式
接下来我们演示主题的 springboot 工程,同样我们先创建 Producer 工程,修改 yaml 文件为:
server:
port: 6666
spring:
activemq:
broker-url: tcp://127.0.0.1:61616 # mq 服务器地址
user: admin
password: admin
jms:
pub-sub-domain: true # 设置为主题
# 自定义主题名称
myTopic: boot-activemq-topic
唯一不同的是,我们把 pub-sub-domain 设置为了 true,表示为主题方式,同时修改主题名称。
对于 Producer 我们需要配置对应的 Topic 的 Bean 文件:
@Component
public class ConfigBean {
// Topic 名称
@Value("${myTopic}")
private String myTopic;
@Bean
public Topic topic() {
return new ActiveMQTopic(myTopic);
}
}
然后就是我们的 topic 的 producer 代码:
@Component
@EnableJms // 开启适配注解
public class TopicProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic topic;
// 定时发送消息
@Scheduled(fixedDelay = 3000)
public void produceTopicMsg() {
String msg = UUID.randomUUID().toString().substring(0, 6);
jmsMessagingTemplate.convertAndSend(topic, "Topic Msg : " + msg);
System.out.println("----- send topic msg : " + msg);
}
}
配置了定时发送之后必要忘了在 SpringBoot 的启动程序加上 @EnableScheduling 注解。
接下来就是我们的 Topic 的 Consumer 项目工程了,工程创建流程还是一样,只需修改我们 Consumer 的 yaml 文件的端口即可:
server:
port: 5555
spring:
activemq:
broker-url: tcp://127.0.0.1:61616 # mq 服务器地址
user: admin
password: admin
jms:
pub-sub-domain: true # 设置为主题
# 自定义主题名称
myTopic: boot-activemq-topic
然后就是编写我们的 Consumer 代码:
@Component
public class TopicConsumer {
// 监听来自 Topic 的消息
@JmsListener(destination = "${myTopic}")
public void receive(TextMessage textMessage) throws JMSException {
System.out.println("接收 topic message :" + textMessage.getText());
}
}
至此,全部配置完毕,对比 Queue,Topic 唯一不同的是需要先启动 Consumer 然后才启动 Producer。
还没有评论,来说两句吧...