四.消息的持久化
当rabbitMq重启的时候,消息依然会丢失。
RabbitMQ提供了持久化的机制,将内存中的消息持久化到硬盘上,即使重启RabbitMQ,消息也不会丢失。但是,仍然有一个非常短暂的时间窗口(RabbitMQ收到消息还没来得及存到硬盘上)会导致消息丢失,如果需要严格的控制,可以参考官方文档(https://www.rabbitmq.com/confirms.html)。
提供者Producer和消费者Consomer的pom.xml和上一章的一样。
一.提供者Consumer
1.发送消息的类:MessageSender.java
package com.rabbitmq.producer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class MessageSender {
private Logger logger = LoggerFactory.getLogger(MessageSender.class);
//声明一个列队名字
private final static String QUEUE_NAME = "hello";
/** * 测试rabbit的消息持久化 * RabbitMQ提供了持久化的机制,将内存中的消息持久化到硬盘上,即使重启RabbitMQ, * 消息也不会丢失。但是,仍然有一个非常短暂的时间窗口(RabbitMQ收到消息还没来得及存到硬盘上) * 会导致消息丢失,如果需要严格的控制,可以参考官方文档(https://www.rabbitmq.com/confirms.html) * @param queueName * @return */
public boolean sendMessageDurable(String message){
//new一个rabbitmq的连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置需要连接的RabbitMQ的机器的地址,这里指向本机
factory.setHost("localhost");
//连接器
Connection connection = null;
//通道
Channel channel = null;
try {
//尝试获取一个连接
connection = factory.newConnection();
//尝试创建一个通道
channel = connection.createChannel();
/*声明一个列队: * 1.queue队列的名字 * 2.是否持久化,是否持久化 为true则在rabbitMQ重启后生存, * 3.自动删除,在最后一个连接断开后删除队列 * 4.其他参数 * */
//注意,RabbitMQ不允许对一个已经存在的队列用不同的参数重新声明,对于试图这么做的程序,会报错,所以,改动之前代码之前,要在控制台中把原来的队列删除
//IO异常:java.io.IOException
boolean durable = true;//是否持久化消息,无论是提供者还是消费者,都可以设置,
//不过RabbitMQ不允许对一个已经存在的队列用不同的参数重新声明,所有提供者或者
//消费者谁先声明后者不能重新声明
channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
/*发布消息,注意这里调用了getBytes(),发送的其实是byte数组,接收方收到消息后,需要重新组装成String * 1.交换模式 * 2.控制消息发送到哪个队列 * 3.其他参数 * 4.body 消息,byte数组 * */
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
logger.info("已发送:"+message);
//关闭通道和链接(先关闭通道在关闭连接)
channel.close();
connection.close();
} catch (IOException e) {
logger.error("IO异常:"+e);
return false;
} catch (TimeoutException e){
logger.error("超时异常:"+e);
return false;
}
return true;
}
}
2.测试发送消息的Mian:DurableMessageMain.java
package com.rabbitmq.main;
import com.rabbitmq.producer.MessageSender;
public class DurableMessageMain {
/** * 测试消息持久化 * @param args */
public static void main(String[] args) {
MessageSender messageSender = new MessageSender();
messageSender.sendMessageDurable("hellow tiglle");
}
}
二.消费者Consumer
1.接收持久化消息的类
package com.rabbitmq.consumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class MessageRecive {
private Logger logger = LoggerFactory.getLogger(MessageRecive.class);
/** * 测试rabbit的消息持久化 * RabbitMQ提供了持久化的机制,将内存中的消息持久化到硬盘上,即使重启RabbitMQ, * 消息也不会丢失。但是,仍然有一个非常短暂的时间窗口(RabbitMQ收到消息还没来得及存到硬盘上) * 会导致消息丢失,如果需要严格的控制,可以参考官方文档(https://www.rabbitmq.com/confirms.html) * @param queueName * @return */
public boolean durabletMessageConsumer(String queueName){
//连接rabbitmq
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
Connection connection = factory.newConnection();
//解决内部类只能访问final修饰的局部变量
final Channel channel = connection.createChannel();
//声明消费的queue
//注意,RabbitMQ不允许对一个已经存在的队列用不同的参数重新声明,对于试图这么做的程序,会报错,所以,改动之前代码之前,要在控制台中把原来的队列删除
//IO异常:java.io.IOException
boolean durable = true;//是否持久化消息,无论是提供者还是消费者,都可以设置,
//不过RabbitMQ不允许对一个已经存在的队列用不同的参数重新声明,所有提供者或者
//消费者谁先声明后者不能重新声明
channel.queueDeclare(queueName,durable,false,false,null);
//在消息确认之前,不在处理其他消息
//prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
channel.basicQos(1);
//这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String(局部内部类)
Consumer consumer = new DefaultConsumer(channel){
//重写父类方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {
String message = new String(body, "UTF-8");
logger.info("接收到:" + message);
//休眠10秒,模拟10秒处理事件
try {
logger.info("开始处理消息(休眠)......");
Thread.sleep(10000);
System.out.println("处理完毕!");
} catch (Exception e) {
// TODO: handle exception
}finally {
//手动应答,告诉服务器可以删除消息,否则不删除或给其他消费者
/** * @param deliveryTag the tag from the received 这个是RabbitMQ用来区分消息的,文档在这(https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.deliver.delivery-tag) * @param multiple true to acknowledge all messages up to and 为true的话,确认所有消息,为false只确认当前消息 */
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//上面是声明消费者,这里用 声明的消费者 消费 列队的消息
System.out.println("开始等待提供者的消息....");
//关闭自动应答,改为手动应答,很重要
boolean autoAsk = false;
channel.basicConsume(queueName, autoAsk,consumer);
//这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费
} catch (Exception e) {
logger.error("出现异常:"+e);
return false;
}
return true;
}
}
2.测试接收持久化消息的Mian方法:
package com.rabbitmq.main;
import com.rabbitmq.consumer.MessageRecive;
public class durableMessageMain {
//从哪个列队取消息
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) {
MessageRecive messageRecive = new MessageRecive();
messageRecive.durabletMessageConsumer(QUEUE_NAME);
}
}
此时,重启rabbitMq后,消息依然存在
还没有评论,来说两句吧...