activeMQ安装与使用,以及与Spring整合
该文章基于《Spring源码深度解析》撰写,感谢郝佳老师的奉献
ActiveMQ Demo
JMS作为JavaEE的规范之一,它的实现产品ActiveMQ一直占有极高的市场份额,下面通过一个小Demo来展示ActiveMQ的用法:
首先需要下载ActiveMQ,然后进入bin目录,选择32位系统或者64位系统下的activemq.bat进行运行,然后将activeMQ目录下的lib中的jar文件导入到我们的项目中,接下来我们开始编写发送端和接收端,该工程的架构如下
![这里写图片描述][Image 1]:
/*Reciver.java*/
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Reciver {
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
Connection connection = connectionFactory.createConnection();
connection.start();
final Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue");
MessageConsumer consumer = session.createConsumer(destination);
int i = 0;
while(i < 3){
i++;
TextMessage message = (TextMessage) consumer.receive();
session.commit();
System.out.println("收到消息"+message.getText());
}
session.close();
connection.close();
}
}
/*Sender.java*/
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Sender {
public static void main(String[] args) throws JMSException, InterruptedException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue");
MessageProducer producer = session.createProducer(destination);
for(int i = 0; i < 3; i++){
TextMessage message = session.createTextMessage("大家好这是个测试");
Thread.sleep(1000);
producer.send(message);
}
session.commit();
session.close();
connection.close();
// System.out.println("good");
}
}
先运行Sender的函数,在运行Receiver,就可以得到如下结果:
![结果][Image 1]
Spring整合ActiveMQ
项目架构如下:
![工程架构][Image 1]
其中各个文件的代码如下
<!--applicationContext-ActiveMQ.xml-->
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
</bean>
<bean id="jmstemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<ref bean="connectionFactory"/>
</property>
</bean>
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0">
<value>HelloWorldQueue</value>
</constructor-arg>
</bean>
</beans>
<!--web.xml-->
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" version="3.0">
<display-name>Archetype Created Web Application</display-name>
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
</web-app>
/*HelloWorldSender.java*/
package Sender;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.Destination;
import java.io.File;
public class HelloWorldSender {
public static void main(String[] args){
ApplicationContext context = new ClassPathXmlApplicationContext(new String[]{
"applicationContext-ActiveMQ.xml"});
File file = new File("/");
System.out.println(file.getAbsolutePath());
for(String str :context.getBeanDefinitionNames()){
System.out.println(str);
}
JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmstemplate");
Destination destination = (Destination) context.getBean("destination");
jmsTemplate.send(destination, session -> session.createTextMessage("大家好这是个测试"));
}
}
/*HelloWorldReciver.java*/
package Reciver;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.TextMessage;
public class HelloWorldReciver {
public static void main(String[] args) throws JMSException {
ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext-ActiveMQ.xml");
JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmstemplate");
Destination destination = (Destination) context.getBean("destination");
TextMessage msg = (TextMessage)jmsTemplate.receive(destination);
System.out.println("recived msg is:"+msg.getText());
}
}
运行规则同单独使用activeMQ相同,所以此处不再赘述。
上面方法的问题在于jmsTemplate.receive(destination);只能接受一次消息,如果未接收到消息,则会一直等到。通过消息监听器可以更好的处理这个问题,下面为工程的架构
![activeMQ监听器][Image 1]
下面给出关于源代码
/*MyMessageListener.java*/
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class MyMessageListener implements MessageListener{
@Override
public void onMessage(Message message) {
TextMessage msg = (TextMessage)message;
try {
System.out.println(msg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
/*HelloWorldSender.java*/
package Sender;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.Destination;
import java.io.File;
public class HelloWorldSender {
public static void main(String[] args){
ApplicationContext context = new ClassPathXmlApplicationContext(new String[]{
"applicationContext-ActiveMQ.xml"});
JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmstemplate");
Destination destination = (Destination) context.getBean("destination");
jmsTemplate.send(destination, session -> session.createTextMessage("大家好这是个测试"));
}
}
<!--applicationContext-ActiveMQ.xml-->
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
</bean>
<bean id="jmstemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<ref bean="connectionFactory"/>
</property>
</bean>
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0">
<value>HelloWorldQueue</value>
</constructor-arg>
</bean>
<bean id="myTextListener" class="Listener.MyMessageListener"/>
<bean id="javaConsumer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="myTextListener"/>
</bean>
</beans>
运行上面的代码,将会出现下面的结果
![结果][Image 1]
[Image 1]:
还没有评论,来说两句吧...