activeMQ入门
目录
一、下载activeMQ包
二、准备Linux环境
三、所遇问题
一、下载activeMQ包
可以直接去官网下载相应的包:[https://activemq.apache.org/][https_activemq.apache.org]
二、准备Linux环境
1、新建文件夹 activeMQ
mkdir activeMQ
2、文件夹授权(在文件夹里)
chmod 777 /activeMQ
3、将下载好的apache-activemq-5.15.15-bin.tar.gz直接上传到Linux新建的文件夹并解压
tar -xvf file.tar //解压 tar包
tar -xzvf file.tar.gz //解压tar.gz
tar -xjvf file.tar.bz2 //解压 tar.bz2
tar -xZvf file.tar.Z //解压tar.Z
unrar e file.rar //解压rar
unzip file.zip //解压zip
4、给activemq的bin目录授权
chmod 755 /bin
5、启动activemq
./activemq
6、查看运行状态
./activemq status
7、常用activemq命令
删除队列中的所有消息:activemq purge 队列名称
显示默认broker的所有主题和队列统计信息: activemq dstat
显示的主题统计信息:activemq dstat topics
显示队列的统计信息:activemq dstat queue
8、通过页面访问后台 [http://IP:8161/admin/][http_IP_8161_admin]
![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MjIyODk1MA_size_16_color_FFFFFF_t_70][]
三、代码实现消费者和生产者
1、消息生产者
package com.example.activemq.provider;
import com.example.activemq.model.TestMqBean;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.runner.RunWith;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import javax.jms.*;
@RunWith(SpringJUnit4ClassRunner.class)
public class providerTest {
public static void main(String[] args) {
Connection connection;
Session session;
Destination destination_request,destination_response;
MessageProducer producer;
MessageConsumer consumer;
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://8.129.14.132:61617");
try {
connection = activeMQConnectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
destination_request = session.createQueue("myQueue");
destination_response = session.createQueue("response-queue");
producer = session.createProducer(destination_request);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
consumer = session.createConsumer(destination_response);
TestMqBean bean = new TestMqBean();
bean.setAge(13);
for (int i = 0; i < 10; i++) {
bean.setName("send to data -" + i);
producer.send(session.createObjectMessage(bean));
}
producer.close();
System.out.println("消息发送成功...");
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
if (null != message) {
TextMessage textMsg = (TextMessage) message;
System.out.println("收到回馈消息" +textMsg.getText());
}
} catch (Exception e) {
}
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
}
2、消息消费者
package com.example.activemq.consumer;
import com.example.activemq.model.TestMqBean;
import lombok.Data;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.runner.RunWith;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import javax.jms.*;
import java.util.Date;
@RunWith(SpringJUnit4ClassRunner.class)
public class consumerTest {
public static void main(String[] args) {
Connection connection = null;
final Session session;
Destination destination_request,destination_response;
MessageConsumer consumer;
final MessageProducer producer;
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://8.129.14.132:61617");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination_request = session.createQueue("myQueue");
destination_response = session.createQueue("response-queue");
consumer = session.createConsumer(destination_request);
producer= session.createProducer(destination_response);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TestMqBean bean = (TestMqBean) ((ObjectMessage) message).getObject();
System.out.println(bean);
if (null != message) {
System.out.println("收到消息" + bean.getName());
Message textMessage = session.createTextMessage("已经成功收到消息,现在开始回复"+ new Date().toString());
producer.send(textMessage);
}
} catch (Exception e) {
// TODO: handle exception
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
四、所遇问题
1、无法启动,通过/activeMQ/apache-activemq-5.15.15/data/activemq.log日志可以看出来异常信息如下:61616被占用了
2021-06-25 10:02:50,276 | INFO | Apache ActiveMQ 5.15.15 (localhost, ID:iZwz9bcdfzd651eqpfascfZ-33253-1624586570146-0:1) is starting | org.apache.activemq.broker.BrokerService | main
2021-06-25 10:02:50,302 | ERROR | Failed to start Apache ActiveMQ (localhost, ID:iZwz9bcdfzd651eqpfascfZ-33253-1624586570146-0:1) | org.apache.activemq.broker.BrokerService | main
java.io.IOException: Transport Connector could not be registered in JMX: java.io.IOException: Failed to bind to server socket: tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600 due to: java.net.BindException: Address already in use (Bind failed)
at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:28)[activemq-client-5.15.15.jar:5.15.15]
at org.apache.activemq.broker.BrokerService.registerConnectorMBean(BrokerService.java:2277)[activemq-broker-5.15.15.jar:5.15.15]
at org.apache.activemq.broker.BrokerService.startTransportConnector(BrokerService.java:2757)[activemq-broker-5.15.15.jar:5.15.15]
at org.apache.activemq.broker.BrokerService.startAllConnectors(BrokerService.java:2653)[activemq-broker-5.15.15.jar:5.15.15]
at org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:777)[activemq-broker-5.15.15.jar:5.15.15]
at org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:739)[activemq-broker-5.15.15.jar:5.15.15]
at org.apache.activemq.broker.BrokerService.start(BrokerService.java:642)[activemq-broker-5.15.15.jar:5.15.15]
at org.apache.activemq.xbean.XBeanBrokerService.afterPropertiesSet(XBeanBrokerService.java:73)[activemq-spring-5.15.15.jar:5.15.15]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.8.0_272]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)[:1.8.0_272]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)[:1.8.0_272]
at java.lang.reflect.Method.invoke(Method.java:498)[:1.8.0_272]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeCustomInitMethod(AbstractAutowireCapableBeanFactory.java:1748)[spring-beans-4.3.30.RELEASE.jar:4.3.30.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1685)[spring-beans-4.3.30.RELEASE.jar:4.3.30.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1615)[spring-beans-4.3.30.RELEASE.jar:4.3.30.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:553)[spring-beans-4.3.30.RELEASE.jar:4.3.30.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:481)[spring-beans-4.3.30.RELEASE.jar:4.3.30.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:312)[spring-beans-4.3.30.RELEASE.jar:4.3.30.RELEASE]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)[spring-beans-4.3.30.RELEASE.jar:4.3.30.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:308)[spring-beans-4.3.30.RELEASE.jar:4.3.30.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)[spring-beans-4.3.30.RELEASE.jar:4.3.30.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:756)[spring-beans-4.3.30.RELEASE.jar:4.3.30.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867)[spring-context-4.3.30.RELEASE.jar:4.3.30.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:542)[spring-context-4.3.30.RELEASE.jar:4.3.30.RELEASE]
at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64)[xbean-spring-4.18.jar:4.18]
at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52)[xbean-spring-4.18.jar:4.18]
at org.apache.activemq.xbean.XBeanBrokerFactory$1.<init>(XBeanBrokerFactory.java:104)[activemq-spring-5.15.15.jar:5.15.15]
at org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:104)[activemq-spring-5.15.15.jar:5.15.15]
at org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:67)[activemq-spring-5.15.15.jar:5.15.15]
at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71)[activemq-broker-5.15.15.jar:5.15.15]
at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54)[activemq-broker-5.15.15.jar:5.15.15]
at org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:87)[activemq-console-5.15.15.jar:5.15.15]
at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63)[activemq-console-5.15.15.jar:5.15.15]
at org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:154)[activemq-console-5.15.15.jar:5.15.15]
at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:63)[activemq-console-5.15.15.jar:5.15.15]
at org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:104)[activemq-console-5.15.15.jar:5.15.15]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.8.0_272]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)[:1.8.0_272]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)[:1.8.0_272]
at java.lang.reflect.Method.invoke(Method.java:498)[:1.8.0_272]
at org.apache.activemq.console.Main.runTaskClass(Main.java:262)[activemq.jar:5.15.15]
at org.apache.activemq.console.Main.main(Main.java:115)[activemq.jar:5.15.15]
2021-06-25 10:02:50,309 | INFO | Apache ActiveMQ 5.15.15 (localhost, ID:iZwz9bcdfzd651eqpfascfZ-33253-1624586570146-0:1) is shutting down | org.apache.activemq.broker.BrokerService | main
2021-06-25 10:02:50,310 | INFO | Connector openwire stopped | org.apache.activemq.broker.TransportConnector | main
2021-06-25 10:02:50,311 | INFO | Connector amqp stopped | org.apache.activemq.broker.TransportConnector | main
2021-06-25 10:02:50,312 | INFO | Connector stomp stopped | org.apache.activemq.broker.TransportConnector | main
2021-06-25 10:02:50,337 | INFO | Connector mqtt stopped | org.apache.activemq.broker.TransportConnector | main
2021-06-25 10:02:50,338 | INFO | Connector ws stopped | org.apache.activemq.broker.TransportConnector | main
2021-06-25 10:02:50,344 | INFO | PListStore:[/activeMQ/apache-activemq-5.15.15/data/localhost/tmp_storage] stopped | org.apache.activemq.store.kahadb.plist.PListStoreImpl | main
2021-06-25 10:02:50,345 | INFO | Stopping async queue tasks | org.apache.activemq.store.kahadb.KahaDBStore | main
2021-06-25 10:02:50,346 | INFO | Stopping async topic tasks | org.apache.activemq.store.kahadb.KahaDBStore | main
2021-06-25 10:02:50,346 | INFO | Stopped KahaDB | org.apache.activemq.store.kahadb.KahaDBStore | main
2、用命令netstat -ano |findstr 61616 查看下端口是否被占用;
3、修改我们activemq的端口号,/activeMQ/apache-activemq-5.15.15/conf/activemq.xml,可以看到我将其改成了61617,再次启动就没问题了
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
还没有评论,来说两句吧...