activeMQ入门

红太狼 2023-02-09 11:57 135阅读 0赞

目录

一、下载activeMQ包

二、准备Linux环境

三、所遇问题


一、下载activeMQ包

  1. 可以直接去官网下载相应的包:[https://activemq.apache.org/][https_activemq.apache.org]

二、准备Linux环境

  1. 1、新建文件夹 activeMQ
  2. mkdir activeMQ
  3. 2、文件夹授权(在文件夹里)
  4. chmod 777 /activeMQ
  5. 3、将下载好的apache-activemq-5.15.15-bin.tar.gz直接上传到Linux新建的文件夹并解压
  6. tar -xvf file.tar //解压 tar包
  7. tar -xzvf file.tar.gz //解压tar.gz
  8. tar -xjvf file.tar.bz2 //解压 tar.bz2
  9. tar -xZvf file.tar.Z //解压tar.Z
  10. unrar e file.rar //解压rar
  11. unzip file.zip //解压zip
  12. 4、给activemqbin目录授权
  13. chmod 755 /bin
  14. 5、启动activemq
  15. ./activemq
  16. 6、查看运行状态
  17. ./activemq status
  18. 7、常用activemq命令
  19. 删除队列中的所有消息:activemq purge 队列名称
  20. 显示默认broker的所有主题和队列统计信息: activemq dstat
  21. 显示的主题统计信息:activemq dstat topics
  22. 显示队列的统计信息:activemq dstat queue
  23. 8、通过页面访问后台 [http://IP:8161/admin/][http_IP_8161_admin]
  24. ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MjIyODk1MA_size_16_color_FFFFFF_t_70][]

三、代码实现消费者和生产者

  1. 1、消息生产者
  2. package com.example.activemq.provider;
  3. import com.example.activemq.model.TestMqBean;
  4. import org.apache.activemq.ActiveMQConnectionFactory;
  5. import org.junit.runner.RunWith;
  6. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  7. import javax.jms.*;
  8. @RunWith(SpringJUnit4ClassRunner.class)
  9. public class providerTest {
  10. public static void main(String[] args) {
  11. Connection connection;
  12. Session session;
  13. Destination destination_request,destination_response;
  14. MessageProducer producer;
  15. MessageConsumer consumer;
  16. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://8.129.14.132:61617");
  17. try {
  18. connection = activeMQConnectionFactory.createConnection();
  19. connection.start();
  20. session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
  21. destination_request = session.createQueue("myQueue");
  22. destination_response = session.createQueue("response-queue");
  23. producer = session.createProducer(destination_request);
  24. producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  25. consumer = session.createConsumer(destination_response);
  26. TestMqBean bean = new TestMqBean();
  27. bean.setAge(13);
  28. for (int i = 0; i < 10; i++) {
  29. bean.setName("send to data -" + i);
  30. producer.send(session.createObjectMessage(bean));
  31. }
  32. producer.close();
  33. System.out.println("消息发送成功...");
  34. consumer.setMessageListener(new MessageListener() {
  35. @Override
  36. public void onMessage(Message message) {
  37. try {
  38. if (null != message) {
  39. TextMessage textMsg = (TextMessage) message;
  40. System.out.println("收到回馈消息" +textMsg.getText());
  41. }
  42. } catch (Exception e) {
  43. }
  44. }
  45. });
  46. } catch (JMSException e) {
  47. e.printStackTrace();
  48. }
  49. }
  50. }
  51. 2、消息消费者
  52. package com.example.activemq.consumer;
  53. import com.example.activemq.model.TestMqBean;
  54. import lombok.Data;
  55. import org.apache.activemq.ActiveMQConnectionFactory;
  56. import org.junit.runner.RunWith;
  57. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  58. import javax.jms.*;
  59. import java.util.Date;
  60. @RunWith(SpringJUnit4ClassRunner.class)
  61. public class consumerTest {
  62. public static void main(String[] args) {
  63. Connection connection = null;
  64. final Session session;
  65. Destination destination_request,destination_response;
  66. MessageConsumer consumer;
  67. final MessageProducer producer;
  68. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://8.129.14.132:61617");
  69. try {
  70. connection = connectionFactory.createConnection();
  71. connection.start();
  72. session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
  73. destination_request = session.createQueue("myQueue");
  74. destination_response = session.createQueue("response-queue");
  75. consumer = session.createConsumer(destination_request);
  76. producer= session.createProducer(destination_response);
  77. producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  78. consumer.setMessageListener(new MessageListener() {
  79. @Override
  80. public void onMessage(Message message) {
  81. try {
  82. TestMqBean bean = (TestMqBean) ((ObjectMessage) message).getObject();
  83. System.out.println(bean);
  84. if (null != message) {
  85. System.out.println("收到消息" + bean.getName());
  86. Message textMessage = session.createTextMessage("已经成功收到消息,现在开始回复"+ new Date().toString());
  87. producer.send(textMessage);
  88. }
  89. } catch (Exception e) {
  90. // TODO: handle exception
  91. }
  92. }
  93. });
  94. } catch (Exception e) {
  95. e.printStackTrace();
  96. }
  97. }
  98. }

四、所遇问题

  1. 1、无法启动,通过/activeMQ/apache-activemq-5.15.15/data/activemq.log日志可以看出来异常信息如下:61616被占用了
  2. 2021-06-25 10:02:50,276 | INFO | Apache ActiveMQ 5.15.15 (localhost, ID:iZwz9bcdfzd651eqpfascfZ-33253-1624586570146-0:1) is starting | org.apache.activemq.broker.BrokerService | main
  3. 2021-06-25 10:02:50,302 | ERROR | Failed to start Apache ActiveMQ (localhost, ID:iZwz9bcdfzd651eqpfascfZ-33253-1624586570146-0:1) | org.apache.activemq.broker.BrokerService | main
  4. java.io.IOException: Transport Connector could not be registered in JMX: java.io.IOException: Failed to bind to server socket: tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600 due to: java.net.BindException: Address already in use (Bind failed)
  5. at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:28)[activemq-client-5.15.15.jar:5.15.15]
  6. at org.apache.activemq.broker.BrokerService.registerConnectorMBean(BrokerService.java:2277)[activemq-broker-5.15.15.jar:5.15.15]
  7. at org.apache.activemq.broker.BrokerService.startTransportConnector(BrokerService.java:2757)[activemq-broker-5.15.15.jar:5.15.15]
  8. at org.apache.activemq.broker.BrokerService.startAllConnectors(BrokerService.java:2653)[activemq-broker-5.15.15.jar:5.15.15]
  9. at org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:777)[activemq-broker-5.15.15.jar:5.15.15]
  10. at org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:739)[activemq-broker-5.15.15.jar:5.15.15]
  11. at org.apache.activemq.broker.BrokerService.start(BrokerService.java:642)[activemq-broker-5.15.15.jar:5.15.15]
  12. at org.apache.activemq.xbean.XBeanBrokerService.afterPropertiesSet(XBeanBrokerService.java:73)[activemq-spring-5.15.15.jar:5.15.15]
  13. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.8.0_272]
  14. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)[:1.8.0_272]
  15. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)[:1.8.0_272]
  16. at java.lang.reflect.Method.invoke(Method.java:498)[:1.8.0_272]
  17. at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeCustomInitMethod(AbstractAutowireCapableBeanFactory.java:1748)[spring-beans-4.3.30.RELEASE.jar:4.3.30.RELEASE]
  18. at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1685)[spring-beans-4.3.30.RELEASE.jar:4.3.30.RELEASE]
  19. at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1615)[spring-beans-4.3.30.RELEASE.jar:4.3.30.RELEASE]
  20. at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:553)[spring-beans-4.3.30.RELEASE.jar:4.3.30.RELEASE]
  21. at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:481)[spring-beans-4.3.30.RELEASE.jar:4.3.30.RELEASE]
  22. at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:312)[spring-beans-4.3.30.RELEASE.jar:4.3.30.RELEASE]
  23. at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)[spring-beans-4.3.30.RELEASE.jar:4.3.30.RELEASE]
  24. at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:308)[spring-beans-4.3.30.RELEASE.jar:4.3.30.RELEASE]
  25. at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)[spring-beans-4.3.30.RELEASE.jar:4.3.30.RELEASE]
  26. at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:756)[spring-beans-4.3.30.RELEASE.jar:4.3.30.RELEASE]
  27. at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867)[spring-context-4.3.30.RELEASE.jar:4.3.30.RELEASE]
  28. at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:542)[spring-context-4.3.30.RELEASE.jar:4.3.30.RELEASE]
  29. at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64)[xbean-spring-4.18.jar:4.18]
  30. at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52)[xbean-spring-4.18.jar:4.18]
  31. at org.apache.activemq.xbean.XBeanBrokerFactory$1.<init>(XBeanBrokerFactory.java:104)[activemq-spring-5.15.15.jar:5.15.15]
  32. at org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:104)[activemq-spring-5.15.15.jar:5.15.15]
  33. at org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:67)[activemq-spring-5.15.15.jar:5.15.15]
  34. at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71)[activemq-broker-5.15.15.jar:5.15.15]
  35. at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54)[activemq-broker-5.15.15.jar:5.15.15]
  36. at org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:87)[activemq-console-5.15.15.jar:5.15.15]
  37. at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63)[activemq-console-5.15.15.jar:5.15.15]
  38. at org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:154)[activemq-console-5.15.15.jar:5.15.15]
  39. at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63)[activemq-console-5.15.15.jar:5.15.15]
  40. at org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:104)[activemq-console-5.15.15.jar:5.15.15]
  41. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.8.0_272]
  42. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)[:1.8.0_272]
  43. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)[:1.8.0_272]
  44. at java.lang.reflect.Method.invoke(Method.java:498)[:1.8.0_272]
  45. at org.apache.activemq.console.Main.runTaskClass(Main.java:262)[activemq.jar:5.15.15]
  46. at org.apache.activemq.console.Main.main(Main.java:115)[activemq.jar:5.15.15]
  47. 2021-06-25 10:02:50,309 | INFO | Apache ActiveMQ 5.15.15 (localhost, ID:iZwz9bcdfzd651eqpfascfZ-33253-1624586570146-0:1) is shutting down | org.apache.activemq.broker.BrokerService | main
  48. 2021-06-25 10:02:50,310 | INFO | Connector openwire stopped | org.apache.activemq.broker.TransportConnector | main
  49. 2021-06-25 10:02:50,311 | INFO | Connector amqp stopped | org.apache.activemq.broker.TransportConnector | main
  50. 2021-06-25 10:02:50,312 | INFO | Connector stomp stopped | org.apache.activemq.broker.TransportConnector | main
  51. 2021-06-25 10:02:50,337 | INFO | Connector mqtt stopped | org.apache.activemq.broker.TransportConnector | main
  52. 2021-06-25 10:02:50,338 | INFO | Connector ws stopped | org.apache.activemq.broker.TransportConnector | main
  53. 2021-06-25 10:02:50,344 | INFO | PListStore:[/activeMQ/apache-activemq-5.15.15/data/localhost/tmp_storage] stopped | org.apache.activemq.store.kahadb.plist.PListStoreImpl | main
  54. 2021-06-25 10:02:50,345 | INFO | Stopping async queue tasks | org.apache.activemq.store.kahadb.KahaDBStore | main
  55. 2021-06-25 10:02:50,346 | INFO | Stopping async topic tasks | org.apache.activemq.store.kahadb.KahaDBStore | main
  56. 2021-06-25 10:02:50,346 | INFO | Stopped KahaDB | org.apache.activemq.store.kahadb.KahaDBStore | main
  57. 2、用命令netstat -ano |findstr 61616 查看下端口是否被占用;
  58. 3、修改我们activemq的端口号,/activeMQ/apache-activemq-5.15.15/conf/activemq.xml,可以看到我将其改成了61617,再次启动就没问题了
  59. <transportConnectors>
  60. <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
  61. <transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
  62. <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
  63. <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
  64. <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
  65. <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
  66. </transportConnectors>

发表评论

表情:
评论列表 (有 0 条评论,135人围观)

还没有评论,来说两句吧...

相关阅读

    相关 ActiveMQ入门

    前言:最近在公司用到了activeMq接收消息,从网上查了些资料,形成了一个大概的理解,所以总结一下,因为知识都是网上学习的难免有疏漏之处,望理解。 参考: [https:

    相关 activeMQ入门

    目录 一、下载activeMQ包 二、准备Linux环境 三、所遇问题 -------------------- 一、下载activeMQ包         可

    相关 ActiveMQ入门

    ActiveMQ介绍      MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ