四.消息的持久化

我不是女神ヾ 2022-06-11 22:52 311阅读 0赞

当rabbitMq重启的时候,消息依然会丢失。

RabbitMQ提供了持久化的机制,将内存中的消息持久化到硬盘上,即使重启RabbitMQ,消息也不会丢失。但是,仍然有一个非常短暂的时间窗口(RabbitMQ收到消息还没来得及存到硬盘上)会导致消息丢失,如果需要严格的控制,可以参考官方文档(https://www.rabbitmq.com/confirms.html)。

提供者Producer和消费者Consomer的pom.xml和上一章的一样。

一.提供者Consumer
1.发送消息的类:MessageSender.java

  1. package com.rabbitmq.producer;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import com.rabbitmq.client.Channel;
  7. import com.rabbitmq.client.Connection;
  8. import com.rabbitmq.client.ConnectionFactory;
  9. public class MessageSender {
  10. private Logger logger = LoggerFactory.getLogger(MessageSender.class);
  11. //声明一个列队名字
  12. private final static String QUEUE_NAME = "hello";
  13. /** * 测试rabbit的消息持久化 * RabbitMQ提供了持久化的机制,将内存中的消息持久化到硬盘上,即使重启RabbitMQ, * 消息也不会丢失。但是,仍然有一个非常短暂的时间窗口(RabbitMQ收到消息还没来得及存到硬盘上) * 会导致消息丢失,如果需要严格的控制,可以参考官方文档(https://www.rabbitmq.com/confirms.html) * @param queueName * @return */
  14. public boolean sendMessageDurable(String message){
  15. //new一个rabbitmq的连接工厂
  16. ConnectionFactory factory = new ConnectionFactory();
  17. //设置需要连接的RabbitMQ的机器的地址,这里指向本机
  18. factory.setHost("localhost");
  19. //连接器
  20. Connection connection = null;
  21. //通道
  22. Channel channel = null;
  23. try {
  24. //尝试获取一个连接
  25. connection = factory.newConnection();
  26. //尝试创建一个通道
  27. channel = connection.createChannel();
  28. /*声明一个列队: * 1.queue队列的名字 * 2.是否持久化,是否持久化 为true则在rabbitMQ重启后生存, * 3.自动删除,在最后一个连接断开后删除队列 * 4.其他参数 * */
  29. //注意,RabbitMQ不允许对一个已经存在的队列用不同的参数重新声明,对于试图这么做的程序,会报错,所以,改动之前代码之前,要在控制台中把原来的队列删除
  30. //IO异常:java.io.IOException
  31. boolean durable = true;//是否持久化消息,无论是提供者还是消费者,都可以设置,
  32. //不过RabbitMQ不允许对一个已经存在的队列用不同的参数重新声明,所有提供者或者
  33. //消费者谁先声明后者不能重新声明
  34. channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
  35. /*发布消息,注意这里调用了getBytes(),发送的其实是byte数组,接收方收到消息后,需要重新组装成String * 1.交换模式 * 2.控制消息发送到哪个队列 * 3.其他参数 * 4.body 消息,byte数组 * */
  36. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  37. logger.info("已发送:"+message);
  38. //关闭通道和链接(先关闭通道在关闭连接)
  39. channel.close();
  40. connection.close();
  41. } catch (IOException e) {
  42. logger.error("IO异常:"+e);
  43. return false;
  44. } catch (TimeoutException e){
  45. logger.error("超时异常:"+e);
  46. return false;
  47. }
  48. return true;
  49. }
  50. }

2.测试发送消息的Mian:DurableMessageMain.java

  1. package com.rabbitmq.main;
  2. import com.rabbitmq.producer.MessageSender;
  3. public class DurableMessageMain {
  4. /** * 测试消息持久化 * @param args */
  5. public static void main(String[] args) {
  6. MessageSender messageSender = new MessageSender();
  7. messageSender.sendMessageDurable("hellow tiglle");
  8. }
  9. }

二.消费者Consumer
1.接收持久化消息的类

  1. package com.rabbitmq.consumer;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import com.rabbitmq.client.AMQP;
  7. import com.rabbitmq.client.Channel;
  8. import com.rabbitmq.client.Connection;
  9. import com.rabbitmq.client.ConnectionFactory;
  10. import com.rabbitmq.client.Consumer;
  11. import com.rabbitmq.client.DefaultConsumer;
  12. import com.rabbitmq.client.Envelope;
  13. public class MessageRecive {
  14. private Logger logger = LoggerFactory.getLogger(MessageRecive.class);
  15. /** * 测试rabbit的消息持久化 * RabbitMQ提供了持久化的机制,将内存中的消息持久化到硬盘上,即使重启RabbitMQ, * 消息也不会丢失。但是,仍然有一个非常短暂的时间窗口(RabbitMQ收到消息还没来得及存到硬盘上) * 会导致消息丢失,如果需要严格的控制,可以参考官方文档(https://www.rabbitmq.com/confirms.html) * @param queueName * @return */
  16. public boolean durabletMessageConsumer(String queueName){
  17. //连接rabbitmq
  18. ConnectionFactory factory = new ConnectionFactory();
  19. factory.setHost("localhost");
  20. try {
  21. Connection connection = factory.newConnection();
  22. //解决内部类只能访问final修饰的局部变量
  23. final Channel channel = connection.createChannel();
  24. //声明消费的queue
  25. //注意,RabbitMQ不允许对一个已经存在的队列用不同的参数重新声明,对于试图这么做的程序,会报错,所以,改动之前代码之前,要在控制台中把原来的队列删除
  26. //IO异常:java.io.IOException
  27. boolean durable = true;//是否持久化消息,无论是提供者还是消费者,都可以设置,
  28. //不过RabbitMQ不允许对一个已经存在的队列用不同的参数重新声明,所有提供者或者
  29. //消费者谁先声明后者不能重新声明
  30. channel.queueDeclare(queueName,durable,false,false,null);
  31. //在消息确认之前,不在处理其他消息
  32. //prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
  33. channel.basicQos(1);
  34. //这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String(局部内部类)
  35. Consumer consumer = new DefaultConsumer(channel){
  36. //重写父类方法
  37. @Override
  38. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {
  39. String message = new String(body, "UTF-8");
  40. logger.info("接收到:" + message);
  41. //休眠10秒,模拟10秒处理事件
  42. try {
  43. logger.info("开始处理消息(休眠)......");
  44. Thread.sleep(10000);
  45. System.out.println("处理完毕!");
  46. } catch (Exception e) {
  47. // TODO: handle exception
  48. }finally {
  49. //手动应答,告诉服务器可以删除消息,否则不删除或给其他消费者
  50. /** * @param deliveryTag the tag from the received 这个是RabbitMQ用来区分消息的,文档在这(https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.deliver.delivery-tag) * @param multiple true to acknowledge all messages up to and 为true的话,确认所有消息,为false只确认当前消息 */
  51. channel.basicAck(envelope.getDeliveryTag(), false);
  52. }
  53. }
  54. };
  55. //上面是声明消费者,这里用 声明的消费者 消费 列队的消息
  56. System.out.println("开始等待提供者的消息....");
  57. //关闭自动应答,改为手动应答,很重要
  58. boolean autoAsk = false;
  59. channel.basicConsume(queueName, autoAsk,consumer);
  60. //这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费
  61. } catch (Exception e) {
  62. logger.error("出现异常:"+e);
  63. return false;
  64. }
  65. return true;
  66. }
  67. }

2.测试接收持久化消息的Mian方法:

  1. package com.rabbitmq.main;
  2. import com.rabbitmq.consumer.MessageRecive;
  3. public class durableMessageMain {
  4. //从哪个列队取消息
  5. private final static String QUEUE_NAME = "hello";
  6. public static void main(String[] args) {
  7. MessageRecive messageRecive = new MessageRecive();
  8. messageRecive.durabletMessageConsumer(QUEUE_NAME);
  9. }
  10. }

此时,重启rabbitMq后,消息依然存在

发表评论

表情:
评论列表 (有 0 条评论,311人围观)

还没有评论,来说两句吧...

相关阅读

    相关 ActiveMQ消息持久

    前言 在前面我我们讲JMS规范的时候有简单的说过activeMQ的消息持久化,演示了如何来设置使消息能够持久化存储。 本次呢,我们将来深入了解activeMQ的消息持久

    相关 .消息持久

    当rabbitMq重启的时候,消息依然会丢失。 RabbitMQ提供了持久化的机制,将内存中的消息持久化到硬盘上,即使重启RabbitMQ,消息也不会丢失。但是,仍然有一个非

    相关 MQ持久消息

    ActiveMQ的另一个问题就是只要是软件就有可能挂掉,挂掉不可怕,怕的是挂掉之后把信息给丢了,所以本节分析一下几种持久化方式: 一、持久化为文件 ActiveMQ默认...