ZeroMQ、RabbitMQ、ActiveMQ 快来打我* 2023-02-24 13:27 24阅读 0赞 # 【ZeroMQ】 # ZeroMQ是一种基于消息队列的多线程网络库,其对套接字类型、连接处理、帧、甚至路由的底层细节进行抽象,提供跨越多种传输协议的套接字。 ZeroMQ是网络通信中的新的一层,结余应用层和传输层之间按照tcp/ip划分。是一个可伸缩层,并行运行,分散在分布式系统间。 ZeroMQ号称是“史上最快的消息队列”,基于c语言开发的,实时流处理sorm的task之间的通信就是用的zeroMQ # 【RabbitMQ】 # RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。 # 【ActiveMQ】 # Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。 ActiveMQ是一种开源的基于JMS(Java Message Servie)规范的一种消息中间件的实现,ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。 **【ActiveMQ的两种消息传递类型】** 1、点对点传输,一个生产者对应一个消费者,生产者向broke推送数据,数据存储在broke的一个队列中,当消费者接受该条队列里的数据。 2、基于发布/订阅模式的传输,根据订阅话题来接收相应数据,一个生产者可以向多个消费者推送数据。 **【总结】** 点对点传输:消费者可以接收到在连接之前生产者所推送的数据; 发布/订阅:消费者只能接收到连接之后生产者推送的数据。 **【ActiveMQ小demo 点对点形式】** **1、启动ActiveMQ** ![ZeroMQ、RabbitMQ、ActiveMQ][ZeroMQ_RabbitMQ_ActiveMQ] ![ZeroMQ、RabbitMQ、ActiveMQ][ZeroMQ_RabbitMQ_ActiveMQ 1] ![ZeroMQ、RabbitMQ、ActiveMQ][ZeroMQ_RabbitMQ_ActiveMQ 2] **登录名及密码:admin admin** **它的日志在 apache-activemq-5.14.4/data中** ![ZeroMQ、RabbitMQ、ActiveMQ][ZeroMQ_RabbitMQ_ActiveMQ 3] **2、在Spring boot项目中加入对activeMq的依赖。** <!--消息队列 activeMq --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency> **3、ActiveMQ生产者** package com.example.demo; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * ActiveMQ生产者 * 2020年5月3日19:05:16 */ public class ActiveMqProducer { private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; public static void main(String[] args) { try{ // 创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 创建连接 Connection connection = activeMQConnectionFactory.createConnection(); // 打开连接 connection.start(); // 创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据 Destination destination = session.createQueue("myQueue"); // 创建一个生产者 MessageProducer producer = session.createProducer(destination); // 向队列推送10个文本消息数据 for (int i = 1 ; i <= 10 ; i++){ // 创建文本消息 TextMessage message = session.createTextMessage("第" + i + "个文本消息"); //发送消息 producer.send(message); //在本地打印消息 System.out.println("已发送的消息:" + message.getText()); } //关闭连接 connection.close(); }catch (Exception e){ e.printStackTrace(); } } } # 【PS:这里的tcp端口号为什么是61616呢?】 # **ActiveMQ允许客户端使用多种协议来连接 配置Transport Connector的文件在activeMQ安装目录的conf/activemq.xml中的<transportConnectors>标签之内。** <transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> </transportConnectors> **ActiveMQ支持的client-broker通讯协议有:TCP、NIO、UDP、SSL、Http(s)、VM** **Transmission Control Protocol (TCP)** 1:这是默认的Broker配置,TCP的Client监听端口是61616。 2:在网络传输数据前,必须要序列化数据,消息是通过一个叫wire protocol的来序列化成字节流。默认情况下,ActiveMQ把wire protocol叫做OpenWire,它的目的是促使网络上的效率和数据快速交互。 3:TCP连接的URI形式:tcp://hostname:port?key=value&key=value,加粗部分是必须的 4:TCP传输的优点: (1)TCP协议传输可靠性高,稳定性强 (2)高效性:字节流方式传递,效率很高 (3)有效性、可用性:应用广泛,支持任何平台 5:所有关于Transport协议的可配置参数,可以参见: [http://activemq.apache.org/configuring-version-5-transports.html][http_activemq.apache.org_configuring-version-5-transports.html] <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> **New I/O API Protocol(NIO)** 1:NIO协议和TCP协议类似,但NIO更侧重于底层的访问操作。它允许开发人员对同一资源可有更多的client调用和服务端有更多的负载。 2:适合使用NIO协议的场景: (1)可能有大量的Client去链接到Broker上一般情况下,大量的Client去链接Broker是被操作系统的线程数所限制的。因此,NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议 (2)可能对于Broker有一个很迟钝的网络传输NIO比TCP提供更好的性能 3:NIO连接的URI形式:nio://hostname:port?key=value 4:Transport Connector配置示例: 复制代码 <transportConnectors> <transportConnector name="tcp" uri="tcp://localhost:61616?trace=true" /> <transportConnector name="nio" uri="nio://localhost:61618?trace=true" /> </transportConnectors> 复制代码 上面的配置,示范了一个TCP协议监听61616端口,一个NIO协议监听61618端口 **User Datagram Protocol(UDP)** 1:UDP和TCP的区别 (1)TCP是一个原始流的传递协议,意味着数据包是有保证的,换句话说,数据包是不会被复制和丢失的。UDP,另一方面,它是不会保证数据包的传递的 (2)TCP也是一个稳定可靠的数据包传递协议,意味着数据在传递的过程中不会被丢失。这样确保了在发送和接收之间能够可靠的传递。相反,UDP仅仅是一个链接协议,所以它没有可靠性之说 2:从上面可以得出:TCP是被用在稳定可靠的场景中使用的;UDP通常用在快速数据传递和不怕数据丢失的场景中,还有ActiveMQ通过防火墙时,只能用UDP 3:UDP连接的URI形式:udp://hostname:port?key=value 4:Transport Connector配置示例: <transportConnectors> <transportConnector name="udp" uri="udp://localhost:61618?trace=true" /> </transportConnectors> **Secure Sockets Layer Protocol (SSL)** 1:连接的URI形式:ssl://hostname:port?key=value 2:Transport Connector配置示例: <transportConnectors> <transportConnector name="ssl" uri="ssl://localhost:61617?trace=true"/> </transportConnectors> **Hypertext Transfer Protocol (HTTP/HTTPS)** 1:像web和email等服务需要通过防火墙来访问的,Http可以使用这种场合 2:连接的URI形式:[http://hostname:port?key=value或者https://hostname:port?key=value][http_hostname_port_key_value_https_hostname_port_key_value] 3:Transport Connector配置示例: <transportConnectors> <transportConnector name="http" uri="[http://localhost:8080?trace=true][http_localhost_8080_trace_true]" /> </transportConnectors> **VM Protocol(VM)** 1、VM transport允许在VM内部通信,从而避免了网络传输的开销。这时候采用的连 接不是socket连接,而是直接的方法调用。 2、第一个创建VM连接的客户会启动一个embed VM broker,接下来所有使用相同的 broker name的VM连接都会使用这个broker。当这个broker上所有的连接都关闭 的时候,这个broker也会自动关闭。 3、连接的URI形式:vm://brokerName?key=value 4、Java中嵌入的方式: vm:broker:(tcp://localhost:6000)?brokerName=embeddedbroker&persistent=fal se , 定义了一个嵌入的broker名称为embededbroker以及配置了一个 tcptransprotconnector在监听端口6000上 5、使用一个加载一个配置文件来启动broker vm://localhost?brokerConfig=xbean:activemq.xml 参考资料:[《ActiveMQ支持的传输协议》][ActiveMQ] **启动ActiveMqProducer.java后:** ![ZeroMQ、RabbitMQ、ActiveMQ][ZeroMQ_RabbitMQ_ActiveMQ 4] ![ZeroMQ、RabbitMQ、ActiveMQ][ZeroMQ_RabbitMQ_ActiveMQ 5] 接下来我们创建消费者(接受者) package com.example.demo; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * ActiveMQ 消费者 * 2020年5月3日19:12:04 */ public class ActiveMQConsumer { private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; public static void main(String[] args) throws JMSException { // 创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 创建连接 Connection connection = activeMQConnectionFactory.createConnection(); // 打开连接 connection.start(); // 创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据 Destination destination = session.createQueue("myQueue"); // 创建消费者 MessageConsumer consumer = session.createConsumer(destination); // 创建消费的监听 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("消费的消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } } **启动Consumer.java后** ![ZeroMQ、RabbitMQ、ActiveMQ][ZeroMQ_RabbitMQ_ActiveMQ 6] **我们可以通过ActiveMQ主动给消费者发送消息** ![ZeroMQ、RabbitMQ、ActiveMQ][ZeroMQ_RabbitMQ_ActiveMQ 7] ![ZeroMQ、RabbitMQ、ActiveMQ][ZeroMQ_RabbitMQ_ActiveMQ 8] # 【拓展】 # **到这里笔者,又有想法了。如果我们一个ActiveMQproductor给多个consumer发送消息会出现什么状况呢?(不妨一试)** **这次我们先启动,多个consumer,再启动productor** **1、创建n个consumer** package com.example.demo; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ActiveMQConsumerFrozen { public static void main(String[] args) throws JMSException { // 创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConsumer.ACTIVEMQ_URL); // 创建连接 Connection connection = activeMQConnectionFactory.createConnection(); // 打开连接 connection.start(); // 创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据 Destination destination = session.createQueue("myQueue"); // 创建消费者 MessageConsumer consumer = session.createConsumer(destination); // 创建消费的监听 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("Frozen消费的消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } } package com.example.demo; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ActiveMQConsumerRedAnt { public static void main(String[] args) throws JMSException { // 创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConsumer.ACTIVEMQ_URL); // 创建连接 Connection connection = activeMQConnectionFactory.createConnection(); // 打开连接 connection.start(); // 创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据 Destination destination = session.createQueue("myQueue"); // 创建消费者 MessageConsumer consumer = session.createConsumer(destination); // 创建消费的监听 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("RedAnt消费的消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } } **2依次启动** ![ZeroMQ、RabbitMQ、ActiveMQ][ZeroMQ_RabbitMQ_ActiveMQ 9] ![ZeroMQ、RabbitMQ、ActiveMQ][ZeroMQ_RabbitMQ_ActiveMQ 10] ![ZeroMQ、RabbitMQ、ActiveMQ][ZeroMQ_RabbitMQ_ActiveMQ 11] **3好了一个演示,说明问题:轮询的形式** # 【接下来,是第二种小demo:发布/订阅模式】 # # 首先生产者productor,它与点对点的形式的区别在于: # # 【点对点形式】 # // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据 Destination destination = session.createQueue("myQueue"); # 【发布/订阅模式】 # // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据 Destination destination = session.createTopic("fronzenTop"); package com.example.demo; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 基于发布/订阅模式 * 2020年5月4日11:59:56 */ public class ActiveMqProducerTop { public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; public static void main(String[] args) { try{ // 创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 创建连接 Connection connection = activeMQConnectionFactory.createConnection(); // 打开连接 connection.start(); // 创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据 Destination destination = session.createTopic("fronzenTop"); // 创建一个生产者 MessageProducer producer = session.createProducer(destination); // 向队列推送10个文本消息数据 for (int i = 1 ; i <= 10 ; i++){ // 创建文本消息 TextMessage message = session.createTextMessage("第" + i + "个文本消息"); //发送消息 producer.send(message); //在本地打印消息 System.out.println("已发送的消息:" + message.getText()); } //关闭连接 connection.close(); }catch (Exception e){ e.printStackTrace(); } } } # 同理消费者consumer也是这点差别 # // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据 Destination destination = session.createQueue("myQueue"); // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据 Destination destination = session.createTopic("fronzenTop"); package com.example.demo; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** *基于发布/订阅模式 * 2020年5月4日12:05:52 */ public class ActiveMQConsumerTop { public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; public static void main(String[] args) throws JMSException { // 创建连接工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 创建连接 Connection connection = activeMQConnectionFactory.createConnection(); // 打开连接 connection.start(); // 创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据 Destination destination = session.createTopic("fronzenTop"); // 创建消费者 MessageConsumer consumer = session.createConsumer(destination); // 创建消费的监听 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("消费的消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } } 现在如果我们先启动生产者,再启动消费者,会发现消费者是无法接收到之前生产者之前所生产的数据,只有消费者先启动,再让生产者消费才可以正常接收数据,这也是发布/订阅的主题模式与点对点的队列模式的一个明显区别 而如果启动两个消费者,那么每一个消费者都能完整的接收到生产者生产的数据,即每一条数据都被消费了两次,这是发布/订阅的主题模式与点对点的队列模式的另一个明显区别。 参考资料:[《浅谈ActiveMQ与使用》][ActiveMQ 1] [ZeroMQ_RabbitMQ_ActiveMQ]: /images/20230209/2f35fba0ce13469f83b288f7fe4ae333.png [ZeroMQ_RabbitMQ_ActiveMQ 1]: /images/20230209/2f9329b3b61448dba73ad74941967845.png [ZeroMQ_RabbitMQ_ActiveMQ 2]: /images/20230209/5d0c434d73784c16b0c428753de543f5.png [ZeroMQ_RabbitMQ_ActiveMQ 3]: /images/20230209/d3b81ee1fa3248c7b9b0e141f109ce26.png [http_activemq.apache.org_configuring-version-5-transports.html]: http://activemq.apache.org/configuring-version-5-transports.html [http_hostname_port_key_value_https_hostname_port_key_value]: http://hostname?key=value%E6%88%96%E8%80%85https://hostname:port?key=value [http_localhost_8080_trace_true]: http://localhost:8080?trace=true [ActiveMQ]: https://www.cnblogs.com/winner-0715/p/6883212.html [ZeroMQ_RabbitMQ_ActiveMQ 4]: /images/20230209/24fc5df8ec974d9295f188dab63f3e05.png [ZeroMQ_RabbitMQ_ActiveMQ 5]: /images/20230209/d99829176c9f40f6b597a3586bca191c.png [ZeroMQ_RabbitMQ_ActiveMQ 6]: /images/20230209/e6e23fbaaf5f4b8f82e263e310177079.png [ZeroMQ_RabbitMQ_ActiveMQ 7]: /images/20230209/d587c3ba43594992bf0ac9f781f8d4db.png [ZeroMQ_RabbitMQ_ActiveMQ 8]: /images/20230209/3345bdec07c947348cbefc77ccd256ca.png [ZeroMQ_RabbitMQ_ActiveMQ 9]: /images/20230209/8edb49254dd446aab8e0e952bc17ce0b.png [ZeroMQ_RabbitMQ_ActiveMQ 10]: /images/20230209/8db26c06914246e99abccec02a3a488e.png [ZeroMQ_RabbitMQ_ActiveMQ 11]: /images/20230209/fc4fdba954b14c71b435fa5e8dfc73cb.png [ActiveMQ 1]: https://www.cnblogs.com/xiguadadage/p/11217604.html
还没有评论,来说两句吧...