ActiveMQ之初体验

短命女 2023-06-17 05:59 139阅读 0赞

一、下载ActiveMQ并在Linux上安装

1.到官网下载,网址为http://activemq.apache.org/components/classic/download/
在这里插入图片描述
2.在Linux虚拟机中解压即可,前提需要Java环境,然后进入解压后的bin目录执行./activemq start > /opt/tool/ActiveMQ/run_activemq.log启动ActiveMQ服务,可执行以下命令验证是否启动
在这里插入图片描述3.进入http://你的虚拟机的IP地址:8161/admin/topics.jsp即可查看控制台,注意默认账号密码都是admin
在这里插入图片描述二、使用Java程序体验一下消息中间件的作用

1.在IDEA中创建一个springboot项目,然后配置pom.xml文件

  1. <!-- activemq所需要的jar包配置-->
  2. <dependency>
  3. <groupId>org.apache.activemq</groupId>
  4. <artifactId>activemq-all</artifactId>
  5. <version>5.15.9</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.xbean</groupId>
  9. <artifactId>xbean-spring</artifactId>
  10. <version>3.16</version>
  11. </dependency>

2.1.使用队列方式体验(一对一模式)

(1)Producer(生产者)

  1. import org.apache.activemq.ActiveMQConnectionFactory;
  2. import javax.jms.*;
  3. /** * @Description 生产者向ActiveMQ发送消息 * @auther XX * @create XXX */
  4. public class JMSProducer {
  5. private static final String ACTIVEMQ_URL="tcp://192.168.120.131:61616";
  6. private static final String QUEUE_NAME ="myQueue-01";
  7. public static void main(String[] args) throws JMSException {
  8. //1.创建连接工厂
  9. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
  10. //2.创建连接并开启
  11. Connection connection = activeMQConnectionFactory.createConnection();
  12. connection.start();
  13. //3.获取会话
  14. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  15. //4.创建消息队列
  16. Queue queueMessage = session.createQueue(QUEUE_NAME);
  17. //5.创建生产者n
  18. MessageProducer producer = session.createProducer(queueMessage);
  19. //6.发送消息
  20. for (int i = 1; i <= 7; i++) {
  21. TextMessage textMessage = session.createTextMessage("生产消息"+i);
  22. producer.send(textMessage);
  23. }
  24. //7.关闭连接
  25. producer.close();
  26. session.close();
  27. connection.close();
  28. }
  29. }

(2)Consumer(消费者):

  1. import org.apache.activemq.ActiveMQConnectionFactory;
  2. import javax.jms.*;
  3. import java.util.concurrent.TimeUnit;
  4. /** * @Description 消费者向ActiveMQ接受消息 * @auther XX * @create XXX */
  5. public class JMSConsumer {
  6. private static final String ACTIVEMQ_URL="tcp://192.168.120.131:61616";
  7. private static final String QUEUE_NAME ="myQueue-01";
  8. public static void main(String[] args) throws JMSException, InterruptedException {
  9. //1.创建连接工厂
  10. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
  11. //2.创建连接并开启
  12. Connection connection = activeMQConnectionFactory.createConnection();
  13. connection.start();
  14. //3.创建会话
  15. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  16. //4.创建需要获取的队列
  17. Queue messageQueue = session.createQueue(QUEUE_NAME);
  18. //5.创建消费者
  19. MessageConsumer consumer = session.createConsumer(messageQueue);
  20. //6.1获取消息(同步方式)
  21. while (true){
  22. TextMessage receive = (TextMessage)consumer.receive();
  23. if (receive!=null){
  24. System.out.println("获取消息:"+receive.getText());
  25. }else{
  26. break;
  27. }
  28. }
  29. //6.2获取消息(异步方式,即监听模式)
  30. consumer.setMessageListener(new MessageListener() {
  31. @Override
  32. public void onMessage(Message message) {
  33. TextMessage textMessage=(TextMessage) message;
  34. try {
  35. System.out.println("获取消息:"+textMessage.getText());
  36. } catch (JMSException e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. });
  41. //使用监听器方式获取消息有可能一次获取不到,所以得一直监听
  42. TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
  43. //7.关闭
  44. consumer.close();
  45. session.close();
  46. connection.close();
  47. }
  48. }

2.2.使用主题方式体验(一对多模式)

(1)Producer(生产者):

  1. import org.apache.activemq.ActiveMQConnectionFactory;
  2. import javax.jms.*;
  3. /** * @Description 生产者向ActiveMQ发送消息 * @auther XX * @create XXX */
  4. public class JMSProducer_Topic {
  5. private static final String ACTIVEMQ_URL="tcp://192.168.120.131:61616";
  6. private static final String TOPIC_NAME ="myTopic-01";
  7. public static void main(String[] args) throws JMSException {
  8. //1.创建连接工厂
  9. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
  10. //2.创建连接并开启
  11. Connection connection = activeMQConnectionFactory.createConnection();
  12. connection.start();
  13. //3.创建会话
  14. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  15. //4.创建要发送的主题
  16. Topic messageTopic = session.createTopic(TOPIC_NAME);
  17. //5.创建生产者
  18. MessageProducer producer = session.createProducer(messageTopic);
  19. //6.发送消息
  20. for (int i = 1; i <= 6; i++) {
  21. TextMessage textMessage = session.createTextMessage("创建消息:"+i);
  22. producer.send(textMessage);
  23. }
  24. //7.关闭连接
  25. producer.close();
  26. session.close();
  27. connection.close();
  28. }
  29. }

(2)Consumer(消费者):

  1. import org.apache.activemq.ActiveMQConnectionFactory;
  2. import javax.jms.*;
  3. import java.util.concurrent.TimeUnit;
  4. /** * @Description 消费者向ActiveMQ接受消息 * @auther XX * @create XXX */
  5. public class JMSConsumer_Topic {
  6. private static final String ACTIVEMQ_URL="tcp://192.168.120.131:61616";
  7. private static final String TOPIC_NAME ="myTopic-01";
  8. public static void main(String[] args) throws JMSException, InterruptedException {
  9. //1.创建连接工厂
  10. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
  11. //2.创建连接并开启
  12. Connection connection = activeMQConnectionFactory.createConnection();
  13. connection.start();
  14. //3.创建会话
  15. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  16. //4.创建要接受消息的主题
  17. Topic messageTopic = session.createTopic(TOPIC_NAME);
  18. //5.创建消费者
  19. MessageConsumer consumer = session.createConsumer(messageTopic);
  20. //6.1获取消息(同步方式)
  21. while (true){
  22. TextMessage receive = (TextMessage)consumer.receive();
  23. if (receive!=null){
  24. System.out.println("获取消息:"+receive.getText());
  25. }else{
  26. break;
  27. }
  28. }
  29. //6.2获取消息(异步方式,即监听)
  30. consumer.setMessageListener(new MessageListener() {
  31. @Override
  32. public void onMessage(Message message) {
  33. TextMessage textMessage = (TextMessage)message;
  34. try {
  35. System.out.println("获取消息:"+textMessage.getText());
  36. } catch (JMSException e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. });
  41. //因为使用异步获取,一次不一定能获取到消息,所以需要多次获取(即自定义一个时间)
  42. TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
  43. //7.关闭连接
  44. consumer.close();
  45. session.close();
  46. connection.close();
  47. }
  48. }

三、使用ActiveMQ控制台查看具体信息

1.队列模式
在这里插入图片描述2.主题模式
在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,139人围观)

还没有评论,来说两句吧...

相关阅读

    相关 springcloud体验

    前面几篇博客中我们大概讲解了一下spring boot的知识,话说管理这些微服务的架构就构成了spring cloud,我们一样,先不说原理先搭建出来再说; 我们今天主要搭

    相关 spring boot体验

    Spring boot 简单搭建 我先不讲spring boot以及最后spring cloud的原理了,相信选择看这篇博客的人已经试着在搭建项目这件事上跃跃欲试了,而非关心

    相关 兄弟连体验

    不知不觉来兄弟连将近一个月了,在这一个月的时间里我学到了很多,专业知识倒是其次,更重要的是一种学习的氛围、一种思考的方式与为人处事的方法,即使四个月后我从兄弟连毕业走上工作岗位

    相关 ActiveMQ——1.ActiveMQ

    前言 中间件 由于业务的不同、技术的发展、硬件和软件的选择有所差别,导致了异构组件或应用并存的局面。要使这些异构的组件协同工作,一个有效的方式就是提供一个允许它们进