ActiveMQ之初体验
一、下载ActiveMQ并在Linux上安装
1.到官网下载,网址为http://activemq.apache.org/components/classic/download/
2.在Linux虚拟机中解压即可,前提需要Java环境,然后进入解压后的bin目录执行./activemq start > /opt/tool/ActiveMQ/run_activemq.log启动ActiveMQ服务,可执行以下命令验证是否启动3.进入http://你的虚拟机的IP地址:8161/admin/topics.jsp即可查看控制台,注意默认账号密码都是admin
二、使用Java程序体验一下消息中间件的作用
1.在IDEA中创建一个springboot项目,然后配置pom.xml文件
<!-- activemq所需要的jar包配置-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
2.1.使用队列方式体验(一对一模式)
(1)Producer(生产者)
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/** * @Description 生产者向ActiveMQ发送消息 * @auther XX * @create XXX */
public class JMSProducer {
private static final String ACTIVEMQ_URL="tcp://192.168.120.131:61616";
private static final String QUEUE_NAME ="myQueue-01";
public static void main(String[] args) throws JMSException {
//1.创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.创建连接并开启
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.获取会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建消息队列
Queue queueMessage = session.createQueue(QUEUE_NAME);
//5.创建生产者n
MessageProducer producer = session.createProducer(queueMessage);
//6.发送消息
for (int i = 1; i <= 7; i++) {
TextMessage textMessage = session.createTextMessage("生产消息"+i);
producer.send(textMessage);
}
//7.关闭连接
producer.close();
session.close();
connection.close();
}
}
(2)Consumer(消费者):
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.concurrent.TimeUnit;
/** * @Description 消费者向ActiveMQ接受消息 * @auther XX * @create XXX */
public class JMSConsumer {
private static final String ACTIVEMQ_URL="tcp://192.168.120.131:61616";
private static final String QUEUE_NAME ="myQueue-01";
public static void main(String[] args) throws JMSException, InterruptedException {
//1.创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.创建连接并开启
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建需要获取的队列
Queue messageQueue = session.createQueue(QUEUE_NAME);
//5.创建消费者
MessageConsumer consumer = session.createConsumer(messageQueue);
//6.1获取消息(同步方式)
while (true){
TextMessage receive = (TextMessage)consumer.receive();
if (receive!=null){
System.out.println("获取消息:"+receive.getText());
}else{
break;
}
}
//6.2获取消息(异步方式,即监听模式)
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage) message;
try {
System.out.println("获取消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//使用监听器方式获取消息有可能一次获取不到,所以得一直监听
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
//7.关闭
consumer.close();
session.close();
connection.close();
}
}
2.2.使用主题方式体验(一对多模式)
(1)Producer(生产者):
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/** * @Description 生产者向ActiveMQ发送消息 * @auther XX * @create XXX */
public class JMSProducer_Topic {
private static final String ACTIVEMQ_URL="tcp://192.168.120.131:61616";
private static final String TOPIC_NAME ="myTopic-01";
public static void main(String[] args) throws JMSException {
//1.创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.创建连接并开启
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建要发送的主题
Topic messageTopic = session.createTopic(TOPIC_NAME);
//5.创建生产者
MessageProducer producer = session.createProducer(messageTopic);
//6.发送消息
for (int i = 1; i <= 6; i++) {
TextMessage textMessage = session.createTextMessage("创建消息:"+i);
producer.send(textMessage);
}
//7.关闭连接
producer.close();
session.close();
connection.close();
}
}
(2)Consumer(消费者):
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.concurrent.TimeUnit;
/** * @Description 消费者向ActiveMQ接受消息 * @auther XX * @create XXX */
public class JMSConsumer_Topic {
private static final String ACTIVEMQ_URL="tcp://192.168.120.131:61616";
private static final String TOPIC_NAME ="myTopic-01";
public static void main(String[] args) throws JMSException, InterruptedException {
//1.创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.创建连接并开启
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建要接受消息的主题
Topic messageTopic = session.createTopic(TOPIC_NAME);
//5.创建消费者
MessageConsumer consumer = session.createConsumer(messageTopic);
//6.1获取消息(同步方式)
while (true){
TextMessage receive = (TextMessage)consumer.receive();
if (receive!=null){
System.out.println("获取消息:"+receive.getText());
}else{
break;
}
}
//6.2获取消息(异步方式,即监听)
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("获取消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//因为使用异步获取,一次不一定能获取到消息,所以需要多次获取(即自定义一个时间)
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
//7.关闭连接
consumer.close();
session.close();
connection.close();
}
}
三、使用ActiveMQ控制台查看具体信息
1.队列模式2.主题模式
还没有评论,来说两句吧...