activeMQ(二)--spring整合activeMQ
- ### 工程结构
pom
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring-version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.13.2</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>${spring-version}</version>
</dependency>
</dependencies>
配置文件
activemq.xml
<context:component-scan base-package="com.susq" />
<context:property-placeholder location="config.properties" />
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${activemq.url}"/>
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等,设置它为队列模式 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(发布/订阅),即队列模式 -->
<property name="pubSubDomain" value="false" />
</bean>
<!-- 设置它为Topic类型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<!-- pub/sub模型(发布/订阅) -->
<property name="pubSubDomain" value="true" />
</bean>
<!--设置队列消息目的地-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<!-- 构造参数的值是队列的名字 -->
<value>${activemq.queue}</value>
</constructor-arg>
</bean>
<!--设置主题消息目的地-->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="${activemq.topic}"/>
</bean>
config.properties
activemq.url=tcp://localhost:61616
activemq.queue=su.queue
activemq.topic=su.topic
log4j.properties
log4j.rootLogger=ERROR,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH
ss,SSS} [%c]-[%p] %m%n
#给自己项目设置日志级别
log4j.logger.com.susq=DEBUG
工程代码
JmsTemplate 是spring 提供的简化了同步JMS访问代码的帮助类。文档:https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/core/JmsTemplate.html
生产者
package com.susq.service.impl;
import com.susq.service.ProducerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
import javax.jms.Destination;
import javax.jms.Session;
@Component("producerService")
@Slf4j
public class ProducerServiceImpl implements ProducerService {
@Autowired
@Qualifier("jmsQueueTemplate")
private JmsTemplate jmsTemplate;
public void sendMessage(Destination destination, final String message) {
log.debug("---------------生产者发送消息-----------------");
log.debug("---------------生产者发了一个消息:" + message);
/* send()方法第二个参数是MessageCreator类型,它里面只有一个方法,所以他是一个函数式接口,支持lambda表达式 public interface MessageCreator { Message createMessage(Session var1) throws JMSException; } 为了防止“函数是接口” 变成"非函数接口”,我们可以在这个上面加上一个声明@FunctionalInterface, 这样别人就无法在里面添加新的接口函数了。 */
jmsTemplate.send(destination, (Session session) -> session.createTextMessage(message));
}
}
消费者
package com.susq.service.impl;
import com.susq.service.ConsumerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.TextMessage;
@Slf4j
@Component("consumerService")
public class ConsumerServiceImpl implements ConsumerService {
@Autowired
@Qualifier("jmsQueueTemplate")
private JmsTemplate jmsTemplate;
public void receiveMessage(Destination destination) {
log.debug("------------消费消息--------------");
while (true) {
try {
//使用JMSTemplate接收消息
TextMessage txtmsg = (TextMessage) jmsTemplate.receive(destination);
if (null != txtmsg) {
log.debug("--- 收到消息内容为: " + txtmsg.getText());
} else {
break;
}
} catch (JMSException e) {
log.error("错误 {}", e);
}
}
}
}
测试
package com.susq;
import com.susq.service.ConsumerService;
import com.susq.service.ProducerService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import javax.jms.Destination;
/** * Unit test for simple App. */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:activemq.xml")
public class AppTest {
@Autowired
private ProducerService producerService;
@Autowired
private Destination queueDestination;
@Autowired
private ConsumerService consumerService;
@Test
public void testProducer() {
producerService.sendMessage(queueDestination, "生产消息");
}
@Test
public void testConsumer() {
consumerService.receiveMessage(queueDestination);
}
}
运行生产者,以后,可以看到发送了一条消息等待消费,对ActiveMQ自带Web管控台不熟悉的,可以参考上一篇 http://blog.csdn.net/u013041642/article/details/79547128
运行消费者
消费者中我们写了一个死循环,使线程持续运行,接受消息。如果不循环,只运行一次,那每次只能消费一条消息。
5. 消息监听器
上面我们是用 jmsTemplate.receive(destination) 来接受消息。而使用消息监听器是一种更优雅的消息接受方式,只需要实现消息监听器接口MessageListener即可。topic监听器写法也一样。
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/** * @author susq * @since 2018-03-15-21:18 */
@Slf4j
@Component("consumerServiceNew")
public class ConsumerServiceNew implements MessageListener {
@Override
public void onMessage(Message message) {
try {
log.debug("ConsumerServiceNew接收到消息:"+((TextMessage)message).getText());
} catch (JMSException e) {
log.debug("接收消息异常,{}", e);
}
}
}
activemq.xml中增加监听容器的配置, 需要设置监听的地址和监听器的bean.
<!-- 定义Queue监听器 -->
<jms:listener-container>
<jms:listener destination="${activemq.queue}" ref="consumerServiceNew"/>
</jms:listener-container>
<!-- 定义topic监听器 -->
<jms:listener-container destination-type="topic" >
<jms:listener destination="${activemq.topic}" ref="topicConsumer"/>
</jms:listener-container>
测试代码
package com.susq;
import com.susq.service.ConsumerService;
import com.susq.service.ProducerService;
import com.susq.service.TopicConsumer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import javax.jms.Destination;
/** * Unit test for simple App. */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:activemq.xml")
public class AppTest {
@Autowired
private ProducerService producerService;
@Autowired
private Destination queueDestination;
@Autowired
private Destination topicDestination;
@Test
public void testProducer() {
producerService.sendMessage(queueDestination, "queue生产消息");
}
@Test
public void testTopicProducer() {
producerService.sendMessage(topicDestination, "topic生产消息");
}
}
运行生产者,加载配置文件的时候,我们配好的监听容器同时被加载,所以生产消息和监听消息同时进行。
todo: 队列消息的监听器,加载以后可以一次性接受之前生产的未消费的所有队列消息。但是主题消息的监听器,不能收到监听器加载之前的消息。有待研究。
消息监听器除了MessageListener还有SessionAwareMessageListener和MessageListenerAdapter,后面再总结。
还没有评论,来说两句吧...