activeMQ(二)--spring整合activeMQ

忘是亡心i 2022-05-29 04:56 469阅读 0赞
  1. ### 工程结构

这里写图片描述

  1. pom

    1. <dependencies>
    2. <dependency>
    3. <groupId>junit</groupId>
    4. <artifactId>junit</artifactId>
    5. <version>4.12</version>
    6. <scope>test</scope>
    7. </dependency>
    8. <dependency>
    9. <groupId>org.springframework</groupId>
    10. <artifactId>spring-context</artifactId>
    11. <version>${spring-version}</version>
    12. </dependency>
    13. <dependency>
    14. <groupId>org.springframework</groupId>
    15. <artifactId>spring-test</artifactId>
    16. <version>${spring-version}</version>
    17. <scope>test</scope>
    18. </dependency>
    19. <dependency>
    20. <groupId>org.apache.activemq</groupId>
    21. <artifactId>activemq-all</artifactId>
    22. <version>5.13.2</version>
    23. </dependency>
    24. <dependency>
    25. <groupId>org.springframework</groupId>
    26. <artifactId>spring-jms</artifactId>
    27. </dependency>
    28. <dependency>
    29. <groupId>org.springframework</groupId>
    30. <artifactId>spring-messaging</artifactId>
    31. <version>${spring-version}</version>
    32. </dependency>
    33. </dependencies>
  2. 配置文件

    activemq.xml

    1. <context:component-scan base-package="com.susq" />
    2. <context:property-placeholder location="config.properties" />
    3. <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
    4. <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    5. <property name="brokerURL" value="${activemq.url}"/>
    6. </bean>
    7. <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    8. <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    9. <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
    10. <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
    11. </bean>
    12. <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等,设置它为队列模式 -->
    13. <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
    14. <constructor-arg ref="connectionFactory" />
    15. <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
    16. <property name="pubSubDomain" value="false" />
    17. </bean>
    18. <!-- 设置它为Topic类型 -->
    19. <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
    20. <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
    21. <constructor-arg ref="connectionFactory" />
    22. <!-- pub/sub模型(发布/订阅) -->
    23. <property name="pubSubDomain" value="true" />
    24. </bean>
    25. <!--设置队列消息目的地-->
    26. <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
    27. <constructor-arg>
    28. <!-- 构造参数的值是队列的名字 -->
    29. <value>${activemq.queue}</value>
    30. </constructor-arg>
    31. </bean>
    32. <!--设置主题消息目的地-->
    33. <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
    34. <constructor-arg value="${activemq.topic}"/>
    35. </bean>

    config.properties

    1. activemq.url=tcp://localhost:61616
    2. activemq.queue=su.queue
    3. activemq.topic=su.topic

    log4j.properties

    1. log4j.rootLogger=ERROR,console
    2. log4j.appender.console=org.apache.log4j.ConsoleAppender
    3. log4j.appender.console.layout=org.apache.log4j.PatternLayout
    4. log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%c]-[%p] %m%n
  1. #给自己项目设置日志级别
  2. log4j.logger.com.susq=DEBUG
  1. 工程代码

    JmsTemplate 是spring 提供的简化了同步JMS访问代码的帮助类。文档:https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/core/JmsTemplate.html

    生产者

    1. package com.susq.service.impl;
    2. import com.susq.service.ProducerService;
    3. import lombok.extern.slf4j.Slf4j;
    4. import org.springframework.beans.factory.annotation.Autowired;
    5. import org.springframework.beans.factory.annotation.Qualifier;
    6. import org.springframework.jms.core.JmsTemplate;
    7. import org.springframework.stereotype.Component;
    8. import javax.jms.Destination;
    9. import javax.jms.Session;
    10. @Component("producerService")
    11. @Slf4j
    12. public class ProducerServiceImpl implements ProducerService {
    13. @Autowired
    14. @Qualifier("jmsQueueTemplate")
    15. private JmsTemplate jmsTemplate;
    16. public void sendMessage(Destination destination, final String message) {
    17. log.debug("---------------生产者发送消息-----------------");
    18. log.debug("---------------生产者发了一个消息:" + message);
    19. /* send()方法第二个参数是MessageCreator类型,它里面只有一个方法,所以他是一个函数式接口,支持lambda表达式 public interface MessageCreator { Message createMessage(Session var1) throws JMSException; } 为了防止“函数是接口” 变成"非函数接口”,我们可以在这个上面加上一个声明@FunctionalInterface, 这样别人就无法在里面添加新的接口函数了。 */
    20. jmsTemplate.send(destination, (Session session) -> session.createTextMessage(message));
    21. }
    22. }

    消费者

    1. package com.susq.service.impl;
    2. import com.susq.service.ConsumerService;
    3. import lombok.extern.slf4j.Slf4j;
    4. import org.springframework.beans.factory.annotation.Autowired;
    5. import org.springframework.beans.factory.annotation.Qualifier;
    6. import org.springframework.jms.core.JmsTemplate;
    7. import org.springframework.stereotype.Component;
    8. import javax.jms.Destination;
    9. import javax.jms.JMSException;
    10. import javax.jms.TextMessage;
    11. @Slf4j
    12. @Component("consumerService")
    13. public class ConsumerServiceImpl implements ConsumerService {
    14. @Autowired
    15. @Qualifier("jmsQueueTemplate")
    16. private JmsTemplate jmsTemplate;
    17. public void receiveMessage(Destination destination) {
    18. log.debug("------------消费消息--------------");
    19. while (true) {
    20. try {
    21. //使用JMSTemplate接收消息
    22. TextMessage txtmsg = (TextMessage) jmsTemplate.receive(destination);
    23. if (null != txtmsg) {
    24. log.debug("--- 收到消息内容为: " + txtmsg.getText());
    25. } else {
    26. break;
    27. }
    28. } catch (JMSException e) {
    29. log.error("错误 {}", e);
    30. }
    31. }
    32. }
    33. }

    测试

    1. package com.susq;
    2. import com.susq.service.ConsumerService;
    3. import com.susq.service.ProducerService;
    4. import org.junit.Test;
    5. import org.junit.runner.RunWith;
    6. import org.springframework.beans.factory.annotation.Autowired;
    7. import org.springframework.test.context.ContextConfiguration;
    8. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    9. import javax.jms.Destination;
    10. /** * Unit test for simple App. */
    11. @RunWith(SpringJUnit4ClassRunner.class)
    12. @ContextConfiguration(locations = "classpath:activemq.xml")
    13. public class AppTest {
    14. @Autowired
    15. private ProducerService producerService;
    16. @Autowired
    17. private Destination queueDestination;
    18. @Autowired
    19. private ConsumerService consumerService;
    20. @Test
    21. public void testProducer() {
    22. producerService.sendMessage(queueDestination, "生产消息");
    23. }
    24. @Test
    25. public void testConsumer() {
    26. consumerService.receiveMessage(queueDestination);
    27. }
    28. }

    运行生产者,以后,可以看到发送了一条消息等待消费,对ActiveMQ自带Web管控台不熟悉的,可以参考上一篇 http://blog.csdn.net/u013041642/article/details/79547128

    这里写图片描述

    运行消费者

    这里写图片描述

​ 消费者中我们写了一个死循环,使线程持续运行,接受消息。如果不循环,只运行一次,那每次只能消费一条消息。

5. 消息监听器

​ 上面我们是用 jmsTemplate.receive(destination) 来接受消息。而使用消息监听器是一种更优雅的消息接受方式,只需要实现消息监听器接口MessageListener即可。topic监听器写法也一样。

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.stereotype.Component;
  3. import javax.jms.JMSException;
  4. import javax.jms.Message;
  5. import javax.jms.MessageListener;
  6. import javax.jms.TextMessage;
  7. /** * @author susq * @since 2018-03-15-21:18 */
  8. @Slf4j
  9. @Component("consumerServiceNew")
  10. public class ConsumerServiceNew implements MessageListener {
  11. @Override
  12. public void onMessage(Message message) {
  13. try {
  14. log.debug("ConsumerServiceNew接收到消息:"+((TextMessage)message).getText());
  15. } catch (JMSException e) {
  16. log.debug("接收消息异常,{}", e);
  17. }
  18. }
  19. }

activemq.xml中增加监听容器的配置, 需要设置监听的地址和监听器的bean.

  1. <!-- 定义Queue监听器 -->
  2. <jms:listener-container>
  3. <jms:listener destination="${activemq.queue}" ref="consumerServiceNew"/>
  4. </jms:listener-container>
  5. <!-- 定义topic监听器 -->
  6. <jms:listener-container destination-type="topic" >
  7. <jms:listener destination="${activemq.topic}" ref="topicConsumer"/>
  8. </jms:listener-container>

测试代码

  1. package com.susq;
  2. import com.susq.service.ConsumerService;
  3. import com.susq.service.ProducerService;
  4. import com.susq.service.TopicConsumer;
  5. import org.junit.Test;
  6. import org.junit.runner.RunWith;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.test.context.ContextConfiguration;
  9. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  10. import javax.jms.Destination;
  11. /** * Unit test for simple App. */
  12. @RunWith(SpringJUnit4ClassRunner.class)
  13. @ContextConfiguration(locations = "classpath:activemq.xml")
  14. public class AppTest {
  15. @Autowired
  16. private ProducerService producerService;
  17. @Autowired
  18. private Destination queueDestination;
  19. @Autowired
  20. private Destination topicDestination;
  21. @Test
  22. public void testProducer() {
  23. producerService.sendMessage(queueDestination, "queue生产消息");
  24. }
  25. @Test
  26. public void testTopicProducer() {
  27. producerService.sendMessage(topicDestination, "topic生产消息");
  28. }
  29. }

运行生产者,加载配置文件的时候,我们配好的监听容器同时被加载,所以生产消息和监听消息同时进行。

todo: 队列消息的监听器,加载以后可以一次性接受之前生产的未消费的所有队列消息。但是主题消息的监听器,不能收到监听器加载之前的消息。有待研究。

消息监听器除了MessageListener还有SessionAwareMessageListener和MessageListenerAdapter,后面再总结。

发表评论

表情:
评论列表 (有 0 条评论,469人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Spring 整合 ActiveMQ

    Spring 是J2EE 最重要的框架,ActiveMQ 是Jms的框架,用于两个程序、系统中的异步通信,两者的用途都挺广泛。上一篇博文介绍的是发布-订阅形式,今次以点-点形式