spring整合Activemq入门实战详细介绍二

曾经终败给现在 2024-04-19 11:05 104阅读 0赞

本篇是基于jar包的方式整合,非maven方式,如需了解maven方式,请查看博主的Activemq入门实战详细介绍一,是采用maven方式写的。

MQ
首先简单的介绍一下MQ,MQ英文名MessageQueue,中文名也就是大家用的消息队列,干嘛用的呢,说白了就是一个消息的接受和转发的容器,可用于消息推送。

ActiveMQ是由Apache出品的,一款最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,它非常快速,支持多种语言的客户端和协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能。

下载ActiveMQ ,这里是基于windows 版本的案例(实际生产还是

官方网站:http://activemq.apache.org/

下载解压得到如下图所示:

以上的目录结构说明:

bin存放的是脚本文件
conf存放的是基本配置文件
data存放的是日志文件
docs存放的是说明文档
examples存放的是简单的实例
lib存放的是activemq所需jar包
webapps用于存放项目的目录
启动ActiveMQ
我们了解activemq的基本目录,下面我们运行一下activemq服务,进入bin目录下的activemq.bat脚本文件或运行自己电脑版本下的activemq.bat,就可以看下图的效果。

è¿éåå¾çæè¿°

监控
ActiveMQ默认启动时,启动了内置的jetty服务器,提供一个用于监控ActiveMQ的admin应用。
在浏览器中输入:http://127.0.0.1:8161/admin/

用户名和密码都是admin

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzMwNzY0OTkx_size_16_color_FFFFFF_t_70

停止服务器,只需要按着Ctrl+Shift+C,之后输入y即可。

=====================================================================

首先,这边把博主的项目结构图如下:这样应该比较清晰吧,如果还是不明白的话,博主可以上传代码,但觉得代码又太简单了,你跟着博主的代码也能跑通的,加油吧!两个项目其实与实际企业开发的流程是差不多的,一个项目调用另一个项目的数据。如果还是不明白的话,请给博主留言,如果是大神的话,您就可以直接无视此项目,毕竟博主目前技术水平不咋的,谢谢。
————————————————

两个项目结构:activemq-producer 发送消息队列的项目,activemq-consumer 接收消费队列的项目。

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzMwNzY0OTkx_size_16_color_FFFFFF_t_70 1

具体步骤:1:首先创建activemq-producer与activemq-consumer这两个项目。

2.然后导入依赖需要的jar包:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzMwNzY0OTkx_size_16_color_FFFFFF_t_70 2

3:配置生产方的配置:applicationContext-activemq.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
  4. xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
  5. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  6. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
  7. http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
  8. http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
  9. http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
  10. <!-- ActiveMQConnectionFactory 工厂连接 -->
  11. <bean id="targetConnection" class="org.apache.activemq.ActiveMQConnectionFactory">
  12. <!--activemq的链接地址 -->
  13. <property name="brokerURL" value="tcp://127.0.0.1:61616"></property>
  14. </bean>
  15. <!-- 通用的connectionfacotry 指定真正使用的连接工厂 ,如其他RibbitMQ,kafka,RiketMQ等,到时候只需要修改class指定的工厂-->
  16. <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
  17. <property name="targetConnectionFactory" ref="targetConnection"></property>
  18. </bean>
  19. <!-- 接收和发送消息时使用的类 模板对象-->
  20. <bean class="org.springframework.jms.core.JmsTemplate">
  21. <property name="connectionFactory" ref="connectionFactory"></property>
  22. </bean>
  23. <!--点对点模式,只能是一个生产者产生一个消息,被一个消费者消费。 -->
  24. <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
  25. <!-- 队列消息名称 -->
  26. <constructor-arg name="name" value="demo-change-queue"></constructor-arg>
  27. </bean>
  28. <!-- 发布订阅模式,一个生产者可以一个消息,可以被多个消费者消费。 -->
  29. <!-- <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
  30. 订阅消息名称
  31. <constructor-arg name="name" value="demo-change-topic"></constructor-arg>
  32. </bean> -->
  33. </beans>

4:配置消费方的配置:applicationContext-activemq.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
  4. xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
  5. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  6. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
  7. http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
  8. http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
  9. http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
  10. <!-- ActiveMQConnectionFactory 工厂连接 -->
  11. <bean id="targetConnection" class="org.apache.activemq.ActiveMQConnectionFactory">
  12. <!--activemq的链接地址 -->
  13. <property name="brokerURL" value="tcp://127.0.0.1:61616"></property>
  14. </bean>
  15. <!-- 通用的connectionfacotry 指定真正使用的连接工厂 ,如其他RibbitMQ,kafka,RiketMQ等,到时候只需要修改class指定的工厂-->
  16. <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
  17. <property name="targetConnectionFactory" ref="targetConnection"></property>
  18. </bean>
  19. <!-- 接收和发送消息时使用的类 模板对象-->
  20. <bean class="org.springframework.jms.core.JmsTemplate">
  21. <property name="connectionFactory" ref="connectionFactory"></property>
  22. </bean>
  23. <!--点对点模式,只能是一个生产者产生一个消息,被一个消费者消费。 -->
  24. <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
  25. <constructor-arg name="name" value="item-change-queue"></constructor-arg>
  26. </bean>
  27. <!-- 发布订阅模式,一个生产者可以一个消息,可以被多个消费者消费。 -->
  28. <!-- <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
  29. 订阅名称
  30. <constructor-arg name="name" value="demo-change-topic"></constructor-arg>
  31. </bean> -->
  32. <!-- 监听器 -->
  33. <bean id="myMessageListener" class="com.learn.activemq.MyMessageListener"></bean>
  34. <!-- 监听容器,作用:启动线程做监听 -->
  35. <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  36. <property name="connectionFactory" ref="connectionFactory"></property>
  37. <!-- 这里ref需要对应相应的模式进行修改-->
  38. <property name="destination" ref="queueDestination"></property>
  39. <property name="messageListener" ref="myMessageListener"></property>
  40. </bean>
  41. <!-- <bean id="myMessageListener2" class="com.learn.activemq.MyMessageListener"></bean>
  42. 监听容器,作用:启动线程做监听
  43. <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  44. <property name="connectionFactory" ref="connectionFactory"></property>
  45. <property name="destination" ref="topicDestination"></property>
  46. <property name="messageListener" ref="myMessageListener2"></property>
  47. </bean>
  48. <bean id="myMessageListener3" class="com.learn.activemq.MyMessageListener"></bean>
  49. 监听容器,作用:启动线程做监听
  50. <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  51. <property name="connectionFactory" ref="connectionFactory"></property>
  52. <property name="destination" ref="topicDestination"></property>
  53. <property name="messageListener" ref="myMessageListener3"></property>
  54. </bean> -->
  55. </beans>

5:写生产方的测试:Producer

  1. package com.learn.activemq;
  2. import javax.jms.Destination;
  3. import javax.jms.JMSException;
  4. import javax.jms.Message;
  5. import javax.jms.Session;
  6. import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
  7. import org.junit.Test;
  8. import org.springframework.context.ApplicationContext;
  9. import org.springframework.jms.core.JmsTemplate;
  10. import org.springframework.jms.core.MessageCreator;
  11. public class Producer {
  12. @Test
  13. public void send() throws Exception{
  14. //1.初始化spring容器
  15. ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext-activemq.xml");
  16. //2.获取到jmstemplate的对象
  17. JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
  18. //3.获取destination
  19. Destination destination = (Destination) context.getBean(Destination.class);
  20. //4.发送消息
  21. jmsTemplate.send(destination, new MessageCreator() {
  22. @Override
  23. public Message createMessage(Session session) throws JMSException {
  24. return session.createTextMessage("我是activemq发送者,生产者,请问消费者在吗?");
  25. }
  26. });
  27. Thread.sleep(1000);//休息1000毫秒 =1秒
  28. }
  29. }

6:写消费方的监听器:

  1. package com.learn.activemq;
  2. import javax.jms.JMSException;
  3. import javax.jms.Message;
  4. import javax.jms.MessageListener;
  5. import javax.jms.TextMessage;
  6. import javax.xml.soap.Text;
  7. public class MyMessageListener implements MessageListener {
  8. @Override
  9. public void onMessage(Message message) {
  10. //获取消息
  11. if(message instanceof TextMessage){
  12. TextMessage textMessage = (TextMessage)message;
  13. String text;
  14. try {
  15. text = textMessage.getText();
  16. System.out.println(text);
  17. } catch (JMSException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. }
  22. }

7:写消费方的测试:(博主这里队列消息模式测试没有问题,但订阅消息模式测试没有成功,需要再修改)

  1. package com.learn.activemq;
  2. import org.junit.Test;
  3. import org.springframework.context.ApplicationContext;
  4. import org.springframework.context.support.ClassPathXmlApplicationContext;
  5. public class ActivemqConsumerTest {
  6. @Test
  7. public void testQueueConsumer() throws Exception {
  8. //初始化spring容器
  9. ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext-activemq.xml");
  10. //等待
  11. System.in.read();
  12. }
  13. }

启动方式:先启动Producer,然后再运行ActivemqConsumerTest。运行就可看到结果:

20190917191321482.png

最后,博主把代码上传到CSDN中,如果没有弄懂,可以参考下载的代码学习Demo.

发表评论

表情:
评论列表 (有 0 条评论,104人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Spring 整合 ActiveMQ

    Spring 是J2EE 最重要的框架,ActiveMQ 是Jms的框架,用于两个程序、系统中的异步通信,两者的用途都挺广泛。上一篇博文介绍的是发布-订阅形式,今次以点-点形式