MQ持久化消息

朱雀 2020-11-06 13:12 941阅读 0赞

ActiveMQ的另一个问题就是只要是软件就有可能挂掉,挂掉不可怕,怕的是挂掉之后把信息给丢了,所以本节分析一下几种持久化方式:

一、持久化为文件

ActiveMQ默认就支持这种方式,只要在发消息时设置消息为持久化就可以了。

打开安装目录下的配置文件:

D:\ActiveMQ\apache-activemq\conf\activemq.xml在越80行会发现默认的配置项:

  1. <persistenceAdapter>
  2. <kahaDB directory="$\{activemq.data\}/kahadb"/>
  3. </persistenceAdapter>

注意这里使用的是kahaDB,是一个基于文件支持事务的消息存储器,是一个可靠,高性能,可扩展的消息存储器。

他的设计初衷就是使用简单并尽可能的快。KahaDB的索引使用一个transaction log,并且所有的destination只使用一个index,有人测试表明:如果用于生产环境,支持1万个active connection,每个connection有一个独立的queue。该表现已经足矣应付大部分的需求。

然后再发送消息的时候改变第二个参数为:

MsgDeliveryMode.Persistent

Message保存方式有2种
PERSISTENT:保存到磁盘,consumer消费之后,message被删除。
NON_PERSISTENT:保存到内存,消费之后message被清除。
注意:堆积的消息太多可能导致内存溢出。

然后打开生产者端发送一个消息:

wps30F4.tmp

不启动消费者端,同时在管理界面查看:

wps3105.tmp

发现有一个消息正在等待,这时如果没有持久化,ActiveMQ宕机后重启这个消息就是丢失,而我们现在修改为文件持久化,重启ActiveMQ后消费者仍然能够收到这个消息。

wps3106.tmp

二、持久化为数据库

我们从支持Mysql为例,先从http://dev.mysql.com/downloads/connector/j/下载mysql-connector-java-5.1.34-bin.jar包放到:

D:\ActiveMQ\apache-activemq\lib目录下。

打开并修改配置文件:

复制代码

  1. <beans
  2. xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  5. http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
  6. <!-- Allows us to use system properties as variables in this configuration file -->
  7. <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
  8. <property name="locations">
  9. <value>file:${activemq.conf}/credentials.properties</value>
  10. </property>
  11. </bean>
  12. <!-- Allows accessing the server log -->
  13. <bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
  14. lazy-init="false" scope="singleton"
  15. init-method="start" destroy-method="stop">
  16. </bean>
  17. <!--
  18. The <broker> element is used to configure the ActiveMQ broker.
  19. -->
  20. <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
  21. <destinationPolicy>
  22. <policyMap>
  23. <policyEntries>
  24. <policyEntry topic=">" >
  25. <!-- The constantPendingMessageLimitStrategy is used to prevent
  26. slow topic consumers to block producers and affect other consumers
  27. by limiting the number of messages that are retained
  28. For more information, see:
  29. http://activemq.apache.org/slow-consumer-handling.html
  30. -->
  31. <pendingMessageLimitStrategy>
  32. <constantPendingMessageLimitStrategy limit="1000"/>
  33. </pendingMessageLimitStrategy>
  34. </policyEntry>
  35. </policyEntries>
  36. </policyMap>
  37. </destinationPolicy>
  38. <!--
  39. The managementContext is used to configure how ActiveMQ is exposed in
  40. JMX. By default, ActiveMQ uses the MBean server that is started by
  41. the JVM. For more information, see:
  42. http://activemq.apache.org/jmx.html
  43. -->
  44. <managementContext>
  45. <managementContext createConnector="false"/>
  46. </managementContext>
  47. <!--
  48. Configure message persistence for the broker. The default persistence
  49. mechanism is the KahaDB store (identified by the kahaDB tag).
  50. For more information, see:
  51. http://activemq.apache.org/persistence.html
  52. <kahaDB directory="${activemq.data}/kahadb"/>
  53. -->
  54. <persistenceAdapter>
  55. <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/>
  56. </persistenceAdapter>
  57. <!--
  58. The systemUsage controls the maximum amount of space the broker will
  59. use before disabling caching and/or slowing down producers. For more information, see:
  60. http://activemq.apache.org/producer-flow-control.html
  61. -->
  62. <systemUsage>
  63. <systemUsage>
  64. <memoryUsage>
  65. <memoryUsage percentOfJvmHeap="70" />
  66. </memoryUsage>
  67. <storeUsage>
  68. <storeUsage limit="100 gb"/>
  69. </storeUsage>
  70. <tempUsage>
  71. <tempUsage limit="50 gb"/>
  72. </tempUsage>
  73. </systemUsage>
  74. </systemUsage>
  75. <!--
  76. The transport connectors expose ActiveMQ over a given protocol to
  77. clients and other brokers. For more information, see:
  78. http://activemq.apache.org/configuring-transports.html
  79. -->
  80. <transportConnectors>
  81. <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
  82. <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
  83. <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
  84. <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
  85. <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
  86. <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
  87. </transportConnectors>
  88. <!-- destroy the spring context on shutdown to stop jetty -->
  89. <shutdownHooks>
  90. <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
  91. </shutdownHooks>
  92. </broker>
  93. <bean id="derby-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
  94. <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
  95. <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
  96. <property name="username" value="root"/>
  97. <property name="password" value=""/>
  98. <property name="maxActive" value="200"/>
  99. <property name="poolPreparedStatements" value="true"/>
  100. </bean>
  101. <!--
  102. Enable web consoles, REST and Ajax APIs and demos
  103. The web consoles requires by default login, you can disable this in the jetty.xml file
  104. Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
  105. -->
  106. <import resource="jetty.xml"/>
  107. </beans>
  108. <!-- END SNIPPET: example -->

复制代码

重启ActiveMQ打开phpmyadmin发现多了3张表:

wps3107.tmp

然后启动生产者(不启动消费者)

在Mysql中可以找到这条消息:

wps3108.tmp

关掉ActiveMQ并重启,模拟宕机。

然后启动消费者:

wps3118.tmp

然后发现Mysql中已经没有这条消息了。

发表评论

表情:
评论列表 (有 0 条评论,941人围观)

还没有评论,来说两句吧...

相关阅读

    相关 ActiveMQ消息持久

    前言 在前面我我们讲JMS规范的时候有简单的说过activeMQ的消息持久化,演示了如何来设置使消息能够持久化存储。 本次呢,我们将来深入了解activeMQ的消息持久

    相关 四.消息持久

    当rabbitMq重启的时候,消息依然会丢失。 RabbitMQ提供了持久化的机制,将内存中的消息持久化到硬盘上,即使重启RabbitMQ,消息也不会丢失。但是,仍然有一个非

    相关 MQ持久消息

    ActiveMQ的另一个问题就是只要是软件就有可能挂掉,挂掉不可怕,怕的是挂掉之后把信息给丢了,所以本节分析一下几种持久化方式: 一、持久化为文件 ActiveMQ默认...