ActiveMQ入门教程(六)-----Spring整合ActiveMQ

痛定思痛。 2021-09-01 04:54 482阅读 0赞

Spring整合ActiveMQ

Maven修改,添加所需要的Spring支持jar包

  1. // Pom.xml
  2. <?xml version="1.0" encoding="UTF-8"?>
  3. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.fyh.activeMQ</groupId>
  7. <artifactId>activemq-demo</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <build>
  10. <plugins>
  11. <plugin>
  12. <groupId>org.apache.maven.plugins</groupId>
  13. <artifactId>maven-compiler-plugin</artifactId>
  14. <configuration>
  15. <source>8</source>
  16. <target>8</target>
  17. </configuration>
  18. </plugin>
  19. </plugins>
  20. <resources>
  21. <resource>
  22. <directory>src/main/java</directory>
  23. <includes>
  24. <include>**/*.properties</include>
  25. <include>**/*.xml</include>
  26. </includes>
  27. <filtering>false</filtering>
  28. </resource>
  29. <!--加如下配置防止resources目录下的spring配置文件找不到(在项目的web模块下)-->
  30. <resource>
  31. <directory>src/main/resources</directory>
  32. <includes>
  33. <include>**/*.properties</include>
  34. <include>**/*.xml</include>
  35. </includes>
  36. <filtering>false</filtering>
  37. </resource>
  38. </resources>
  39. </build>
  40. <properties>
  41. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  42. <maven.compiler.source>1.7</maven.compiler.source>
  43. <maven.compiler.target>1.7</maven.compiler.target>
  44. </properties>
  45. <dependencies>
  46. <!-- activemq所需要的jarb包-->
  47. <dependency>
  48. <groupId>org.apache.activemq</groupId>
  49. <artifactId>activemq-all</artifactId>
  50. <version>5.15.9</version>
  51. </dependency>
  52. <dependency>
  53. <groupId>org.apache.xbean</groupId>
  54. <artifactId>xbean-spring</artifactId>
  55. <version>3.16</version>
  56. </dependency>
  57. <!-- Caused by : java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.ObjectMapper -->
  58. <dependency>
  59. <groupId>com.fasterxml.jackson.core</groupId>
  60. <artifactId>jackson-databind</artifactId>
  61. <version>2.9.5</version>
  62. </dependency>
  63. <!--activemqjms的支持,整合Spring Activemq-->
  64. <dependency>
  65. <groupId>org.springframework</groupId>
  66. <artifactId>spring-jms</artifactId>
  67. </dependency>
  68. <!--active所需要的pool包设置-->
  69. <dependency>
  70. <groupId>org.apache.activemq</groupId>
  71. <artifactId>activemq-pool</artifactId>
  72. <version>5.15.9</version>
  73. </dependency>
  74. <!--spring AOP等相关jar-->
  75. <dependency>
  76. <groupId>org.springframework</groupId>
  77. <artifactId>spring-core</artifactId>
  78. <version>4.3.23.RELEASE</version>
  79. </dependency>
  80. <dependency>
  81. <groupId>org.springframework</groupId>
  82. <artifactId>spring-context-support</artifactId>
  83. <version>4.3.23.RELEASE</version>
  84. </dependency>
  85. <dependency>
  86. <groupId>org.springframework</groupId>
  87. <artifactId>spring-aop</artifactId>
  88. <version>4.3.23.RELEASE</version>
  89. </dependency>
  90. <dependency>
  91. <groupId>org.springframework</groupId>
  92. <artifactId>spring-orm</artifactId>
  93. <version>4.3.23.RELEASE</version>
  94. </dependency>
  95. <dependency>
  96. <groupId>org.aspectj</groupId>
  97. <artifactId>aspectjrt</artifactId>
  98. <version>1.6.1</version>
  99. </dependency>
  100. <dependency>
  101. <groupId>org.aspectj</groupId>
  102. <artifactId>aspectjweaver</artifactId>
  103. <version>1.5.4</version>
  104. </dependency>
  105. <dependency>
  106. <groupId>cglib</groupId>
  107. <artifactId>cglib</artifactId>
  108. <version>2.1_2</version>
  109. </dependency>
  110. <!-- 下面是junit/log4j等基础通用配置 -->
  111. <dependency>
  112. <groupId>org.slf4j</groupId>
  113. <artifactId>slf4j-api</artifactId>
  114. <version>1.7.25</version>
  115. </dependency>
  116. <dependency>
  117. <groupId>org.projectlombok</groupId>
  118. <artifactId>lombok</artifactId>
  119. <version>1.16.16</version>
  120. <scope>provided</scope>
  121. </dependency>
  122. <dependency>
  123. <groupId>junit</groupId>
  124. <artifactId>junit</artifactId>
  125. <version>4.12</version>
  126. </dependency>
  127. </dependencies>
  128. </project>

队列(QUEUE)

配置文件 applicationContext.xml

  1. // applicationContext.xml
  2. <?xml version="1.0" encoding="UTF-8"?>
  3. <beans xmlns="http://www.springframework.org/schema/beans"
  4. xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
  5. xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
  6. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  7. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
  8. http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
  9. http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
  10. http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
  11. <!-- 真正可以产生ConnectionConnectionFactory,由对应的 JMS服务厂商提供 -->
  12. <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  13. <property name="brokerURL" value="tcp://localhost:61616" />
  14. </bean>
  15. <!-- Spring用于管理真正的ConnectionFactoryConnectionFactory -->
  16. <bean id="connectionFactory"
  17. class="org.springframework.jms.connection.SingleConnectionFactory">
  18. <!-- 目标ConnectionFactory对应真实的可以产生JMS ConnectionConnectionFactory -->
  19. <property name="targetConnectionFactory" ref="targetConnectionFactory" />
  20. </bean>
  21. <!-- 配置生产者 -->
  22. <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
  23. <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  24. <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
  25. <property name="connectionFactory" ref="connectionFactory" />
  26. </bean>
  27. <!--这个是队列目的地,点对点的 -->
  28. <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
  29. <constructor-arg>
  30. <value>spring-queue</value> <!--这个注入的是队列的名字-->
  31. </constructor-arg>
  32. </bean>
  33. <!--这个是主题目的地,一对多的 -->
  34. <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
  35. <constructor-arg value="itemAddTopic" />
  36. </bean>
  37. </beans>

代码实现

生产者

  1. @Service
  2. public class SpringMQ_Producer {
  3. public static void main(String[] args) {
  4. //初始化spring容器
  5. ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
  6. //从容器中获得JmsTemplate对象。
  7. JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
  8. //从容器中获得一个Destination对象。
  9. Destination destination = (Destination) applicationContext.getBean("queueDestination");
  10. //发送消息
  11. jmsTemplate.send(destination, new MessageCreator() {
  12. @Override
  13. public Message createMessage(Session session) throws JMSException {
  14. return session.createTextMessage("send activemq message");
  15. }
  16. });
  17. System.out.println("***************producer send over");
  18. }
  19. }

消费者

  1. // 消费者
  2. @Service
  3. public class SpringMQ_Consumer {
  4. @Autowired
  5. private JmsTemplate jmsTemplate;
  6. public static void main(String[] args) {
  7. //初始化spring容器
  8. ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
  9. //从容器中获得JmsTemplate对象。
  10. JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
  11. //从容器中获得一个Destination对象。
  12. Destination destination = (Destination) applicationContext.getBean("queueDestination");
  13. String retValue = (String)jmsTemplate.receiveAndConvert(destination);
  14. System.out.println("*****消费者收到消息" + retValue);
  15. }
  16. }

主题(Topic)

配置

applicationContext.xml

  1. // applicationContext.xml
  2. <?xml version="1.0" encoding="utf-8" ?>
  3. <beans xmlns="http://www.springframework.org/schema/beans"
  4. xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
  5. xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
  6. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  7. xsi:schemaLocation="http://www.springframework.org/schema/beans
  8. http://www.springframework.org/schema/beans/spring-beans.xsd
  9. http://www.springframework.org/schema/context
  10. http://www.springframework.org/schema/context/spring-context.xsd
  11. http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
  12. http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
  13. <!--开启包的自动扫描-->
  14. <context:component-scan base-package="com.fyh.activeMQ"/>
  15. <!--配置生产者-->
  16. <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
  17. <property name="connectionFactory">
  18. <!--真正可以产生ConnectionConnectionFactory,由对应的JMS厂商提供-->
  19. <bean class="org.apache.activemq.ActiveMQConnectionFactory">
  20. <property name="brokerURL" value="tcp://127.0.0.1:61616"/>
  21. </bean>
  22. </property>
  23. <property name="maxConnections" value="100"/>
  24. </bean>
  25. <!--这个是主题-->
  26. <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
  27. <constructor-arg index="0" value="spring-active-topic"/>
  28. </bean>
  29. <!--这是队列,点对点形式存在-->
  30. <!--<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
  31. <constructor-arg index="0" value="spring-active-queue"/>
  32. </bean>-->
  33. <!--spring提供JMS的工具类,它可以进行消息的发送、接受-->
  34. <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  35. <property name="connectionFactory" ref="jmsFactory"/>
  36. <property name="defaultDestination" ref="destinationTopic"/>
  37. <property name="messageConverter">
  38. <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
  39. </property>
  40. </bean>
  41. <!--<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  42. <property name="connectionFactory" ref="jmsFactory"/>
  43. <property name="defaultDestination" ref="destinationQueue"/>
  44. <property name="messageConverter">
  45. <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
  46. </property>
  47. </bean>-->
  48. </beans>

生产者

  1. @Service
  2. public class SpringMQ_Producer {
  3. public static void main(String[] args) {
  4. //初始化spring容器
  5. ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
  6. //从容器中获得JmsTemplate对象。
  7. JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
  8. //发送消息
  9. jmsTemplate.send(new MessageCreator() {
  10. @Override
  11. public Message createMessage(Session session) throws JMSException {
  12. return session.createTextMessage("send activemq message");
  13. }
  14. });
  15. System.out.println("***************producer send over");
  16. }
  17. }

消费者

  1. @Service
  2. public class SpringMQ_Consumer {
  3. @Autowired
  4. private JmsTemplate jmsTemplate;
  5. public static void main(String[] args) {
  6. //初始化spring容器
  7. ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
  8. //从容器中获得JmsTemplate对象。
  9. JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
  10. String retValue = (String)jmsTemplate.receiveAndConvert();
  11. System.out.println("*****消费者收到消息" + retValue);
  12. }
  13. }

在spring里面实现消费者不启动,直接通过配置监听器完成

Spring配置文件(在原来的基础上添加)

  1. // applicationContext.xml
  2. <!--配置监听程序-->
  3. <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  4. <property name="connectionFactory" ref="jmsFactory"/>
  5. <property name="destination" ref="destinationQueue"/>
  6. <!-- public class myListener implements MessageListener-->
  7. <property name="messageListener" ref="myMessageListener"/>
  8. </bean>
  9. <!-- 引入监听器myMessageListener : 第一种方法 ;第二种方法直接在类上添加注解 @Component但是前提条件是要把自动扫描写在首行
  10. <bean id="myMessageListener" class="com.fyh.activeMQ.spring.messageListener"/>-->

写一个类来实现消息监听

  1. // myMessageListener.java
  2. @Component
  3. public class myMessageListener implements MessageListener {
  4. @Override
  5. public void onMessage(Message message) {
  6. if (null != message && message instanceof TextMessage) {
  7. TextMessage textMessage = (TextMessage) message;
  8. try {
  9. System.out.println(textMessage.getText());
  10. } catch (JMSException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. }
  15. }

不用启动消费者直接启动生产者监听器随时监听消息

消费者不变不启动

  1. @Service
  2. public class SpringMQ_Consumer {
  3. @Autowired
  4. private JmsTemplate jmsTemplate;
  5. public static void main(String[] args) {
  6. //初始化spring容器
  7. ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
  8. //从容器中获得JmsTemplate对象。
  9. JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
  10. String retValue = (String)jmsTemplate.receiveAndConvert();
  11. System.out.println("*****消费者收到消息" + retValue);
  12. }
  13. }

生产者启动

  1. @Service
  2. public class SpringMQ_Producer {
  3. public static void main(String[] args) {
  4. //初始化spring容器
  5. ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
  6. //从容器中获得JmsTemplate对象。
  7. JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
  8. //发送消息
  9. jmsTemplate.send(new MessageCreator() {
  10. @Override
  11. public Message createMessage(Session session) throws JMSException {
  12. return session.createTextMessage("send activemq message");
  13. }
  14. });
  15. System.out.println("***************producer send over");
  16. }
  17. }

发表评论

表情:
评论列表 (有 0 条评论,482人围观)

还没有评论,来说两句吧...

相关阅读