activeMQ安装与使用,以及与Spring整合

朱雀 2022-05-29 02:27 248阅读 0赞

该文章基于《Spring源码深度解析》撰写,感谢郝佳老师的奉献

ActiveMQ Demo

JMS作为JavaEE的规范之一,它的实现产品ActiveMQ一直占有极高的市场份额,下面通过一个小Demo来展示ActiveMQ的用法:
首先需要下载ActiveMQ,然后进入bin目录,选择32位系统或者64位系统下的activemq.bat进行运行,然后将activeMQ目录下的lib中的jar文件导入到我们的项目中,接下来我们开始编写发送端和接收端,该工程的架构如下
![这里写图片描述][Image 1]:

  1. /*Reciver.java*/
  2. import org.apache.activemq.ActiveMQConnectionFactory;
  3. import javax.jms.*;
  4. public class Reciver {
  5. public static void main(String[] args) throws JMSException {
  6. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
  7. Connection connection = connectionFactory.createConnection();
  8. connection.start();
  9. final Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
  10. Destination destination = session.createQueue("my-queue");
  11. MessageConsumer consumer = session.createConsumer(destination);
  12. int i = 0;
  13. while(i < 3){
  14. i++;
  15. TextMessage message = (TextMessage) consumer.receive();
  16. session.commit();
  17. System.out.println("收到消息"+message.getText());
  18. }
  19. session.close();
  20. connection.close();
  21. }
  22. }
  23. /*Sender.java*/
  24. import org.apache.activemq.ActiveMQConnectionFactory;
  25. import javax.jms.*;
  26. public class Sender {
  27. public static void main(String[] args) throws JMSException, InterruptedException {
  28. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
  29. Connection connection = connectionFactory.createConnection();
  30. Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
  31. Destination destination = session.createQueue("my-queue");
  32. MessageProducer producer = session.createProducer(destination);
  33. for(int i = 0; i < 3; i++){
  34. TextMessage message = session.createTextMessage("大家好这是个测试");
  35. Thread.sleep(1000);
  36. producer.send(message);
  37. }
  38. session.commit();
  39. session.close();
  40. connection.close();
  41. // System.out.println("good");
  42. }
  43. }

先运行Sender的函数,在运行Receiver,就可以得到如下结果:
![结果][Image 1]

Spring整合ActiveMQ

项目架构如下:
![工程架构][Image 1]
其中各个文件的代码如下

  1. <!--applicationContext-ActiveMQ.xml-->
  2. <?xml version="1.0" encoding="UTF-8"?>
  3. <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
  4. <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  5. <property name="brokerURL">
  6. <value>tcp://localhost:61616</value>
  7. </property>
  8. </bean>
  9. <bean id="jmstemplate" class="org.springframework.jms.core.JmsTemplate">
  10. <property name="connectionFactory">
  11. <ref bean="connectionFactory"/>
  12. </property>
  13. </bean>
  14. <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
  15. <constructor-arg index="0">
  16. <value>HelloWorldQueue</value>
  17. </constructor-arg>
  18. </bean>
  19. </beans>
  20. <!--web.xml-->
  21. <?xml version="1.0" encoding="UTF-8"?>
  22. <web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" version="3.0">
  23. <display-name>Archetype Created Web Application</display-name>
  24. <listener>
  25. <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
  26. </listener>
  27. </web-app>
  28. /*HelloWorldSender.java*/
  29. package Sender;
  30. import org.springframework.context.ApplicationContext;
  31. import org.springframework.context.support.ClassPathXmlApplicationContext;
  32. import org.springframework.jms.core.JmsTemplate;
  33. import javax.jms.Destination;
  34. import java.io.File;
  35. public class HelloWorldSender {
  36. public static void main(String[] args){
  37. ApplicationContext context = new ClassPathXmlApplicationContext(new String[]{
  38. "applicationContext-ActiveMQ.xml"});
  39. File file = new File("/");
  40. System.out.println(file.getAbsolutePath());
  41. for(String str :context.getBeanDefinitionNames()){
  42. System.out.println(str);
  43. }
  44. JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmstemplate");
  45. Destination destination = (Destination) context.getBean("destination");
  46. jmsTemplate.send(destination, session -> session.createTextMessage("大家好这是个测试"));
  47. }
  48. }
  49. /*HelloWorldReciver.java*/
  50. package Reciver;
  51. import org.springframework.context.ApplicationContext;
  52. import org.springframework.context.support.ClassPathXmlApplicationContext;
  53. import org.springframework.jms.core.JmsTemplate;
  54. import javax.jms.Destination;
  55. import javax.jms.JMSException;
  56. import javax.jms.TextMessage;
  57. public class HelloWorldReciver {
  58. public static void main(String[] args) throws JMSException {
  59. ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext-ActiveMQ.xml");
  60. JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmstemplate");
  61. Destination destination = (Destination) context.getBean("destination");
  62. TextMessage msg = (TextMessage)jmsTemplate.receive(destination);
  63. System.out.println("recived msg is:"+msg.getText());
  64. }
  65. }

运行规则同单独使用activeMQ相同,所以此处不再赘述。
上面方法的问题在于jmsTemplate.receive(destination);只能接受一次消息,如果未接收到消息,则会一直等到。通过消息监听器可以更好的处理这个问题,下面为工程的架构
![activeMQ监听器][Image 1]
下面给出关于源代码

  1. /*MyMessageListener.java*/
  2. import javax.jms.JMSException;
  3. import javax.jms.Message;
  4. import javax.jms.MessageListener;
  5. import javax.jms.TextMessage;
  6. public class MyMessageListener implements MessageListener{
  7. @Override
  8. public void onMessage(Message message) {
  9. TextMessage msg = (TextMessage)message;
  10. try {
  11. System.out.println(msg.getText());
  12. } catch (JMSException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. }
  17. /*HelloWorldSender.java*/
  18. package Sender;
  19. import org.springframework.context.ApplicationContext;
  20. import org.springframework.context.support.ClassPathXmlApplicationContext;
  21. import org.springframework.jms.core.JmsTemplate;
  22. import javax.jms.Destination;
  23. import java.io.File;
  24. public class HelloWorldSender {
  25. public static void main(String[] args){
  26. ApplicationContext context = new ClassPathXmlApplicationContext(new String[]{
  27. "applicationContext-ActiveMQ.xml"});
  28. JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmstemplate");
  29. Destination destination = (Destination) context.getBean("destination");
  30. jmsTemplate.send(destination, session -> session.createTextMessage("大家好这是个测试"));
  31. }
  32. }
  33. <!--applicationContext-ActiveMQ.xml-->
  34. <?xml version="1.0" encoding="UTF-8"?>
  35. <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
  36. <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  37. <property name="brokerURL">
  38. <value>tcp://localhost:61616</value>
  39. </property>
  40. </bean>
  41. <bean id="jmstemplate" class="org.springframework.jms.core.JmsTemplate">
  42. <property name="connectionFactory">
  43. <ref bean="connectionFactory"/>
  44. </property>
  45. </bean>
  46. <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
  47. <constructor-arg index="0">
  48. <value>HelloWorldQueue</value>
  49. </constructor-arg>
  50. </bean>
  51. <bean id="myTextListener" class="Listener.MyMessageListener"/>
  52. <bean id="javaConsumer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  53. <property name="connectionFactory" ref="connectionFactory"/>
  54. <property name="destination" ref="destination"/>
  55. <property name="messageListener" ref="myTextListener"/>
  56. </bean>
  57. </beans>

运行上面的代码,将会出现下面的结果
![结果][Image 1]

[Image 1]:

发表评论

表情:
评论列表 (有 0 条评论,248人围观)

还没有评论,来说两句吧...

相关阅读