RabbitMQ消息持久化、消息确认、消息重试、死信队列实战!
不知各位小伙伴是否听闻过:MQ消息持久化、消息的确认机制、消息的重试机制和死信队列这几个名词啊,如果没听说过的小伙伴可就要去好好补补课啦!
微信搜索:“每日一试” 或 “每日一面”关注公众号,每日收集大厂面试真题,每天学习新知识!
或者扫描下方二维码:
既然听说过,那你们可知这些又是如何实现的呢?
闲话不多说,接下来咱们进行实战!
1. 配置环境:
①安装Erlang环境包
②RabbitMQ官网:http://www.rabbitmq.com/
下载RabbitMQ软件包并安装,此文就不再一一赘述了
ps:若是需要以上详细安装步骤的可关注公众号留言
2. 编写SpringBoot测试模块
①新建SpringBoot项目
②pom.xml添加依赖
<!-- rabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
③yml配置文件
④编写启动类
@SpringBootApplication
public class ApiApplication {
public static void main(String[] args) {
SpringApplication.run(ApiApplication.class, args);
}
}
3. 消息的持久化
3.1配置类:FanoutCongif.java
创建一个队列:
@Bean
public Queue createQueue() {
Queue queue = new Queue("test_queue", true, false, false, null);
return queue;
}
Queue构造方法有五个参数
- 第一个参数:队列名称
- 第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失,此参数默认是true,即默认持久化
- 第三个参数:是否队列私有化,false则表示所有的消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用
- 第四个参数:是否自动删除,false代表连接停掉后不会自动删除这个队列
- 其他参数设置
创建一个交换机:
@Bean
public FanoutExchange createExchange() {
FanoutExchange fanoutExchange = new FanoutExchange("test_fanoutExchange", true, false, null);
return fanoutExchange;
}
Exchange构造方法参数与Queue一样。
声明交换机与队列的绑定关系:
@Bean
public Binding createBinding() {
return BindingBuilder.bind(createQueue()).to(createExchange());
}
启动项目,会自动帮我们创建交换机以及队列
交换机、队列持久化:即MQ服务器关闭或者重启之后,交换机依然存在,队列以及队列中未曾被消费的数据依然存在
3.1.1 持久化验证
①通过RabbitMQ的控制台:localhost:15672登录查看
②进入到RabbitMQ的sbin目录,使用cmd命令:使用rabbitmqctl.bat list_queues命令查看队列以及队列的消息数量
3.2 消息生产者及消费者
消息生产者:RabbitProducerController.java
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMes")
public String sendMes(String msg) {
rabbitTemplate.convertAndSend("test_fanoutExchange", "test_queue", msg, message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
},
new CorrelationData(UUID.randomUUID().toString()));
// rabbitTemplate.convertAndSend("test_fanoutExchange", "test_queue", msg);// 使用此代码也可发送消息
log.info("生产消息:{}", msg);
return "生产消息:" + msg;
}
消息消费者:
@RabbitListener(queues = "test_queue")
// @RabbitHandler
public void consumer(String msg, Channel channel, Message message) throws IOException {
log.info("消费消息:{}", msg);
}
3.2.1 测试:
使用浏览器或postman发送消息,可以看到后台输出日志,表名我们的消息已经生产并且被消费了
MQ可视化后台查看:可以看到,队列中没有消息,因为已经消费完了
ps:消息消费完了之后自动删除,是因为RabbitMQ默认是使用的自动确认机制,可以改为手动确认机制↓
4. 消息确认机制
我们知道RabbitMQ的消息确认机制有
- 生产者—>Exchange(生产者)
- Exchange—>Queue(生产者)
- 消费者ACK(manual:手动确认 auto:自动确认(默认) none:不确认)
在yml配置文件添加代码:
好,此时我们重启项目,再次生产一条消息:
可以看到我们的消费者依然是正常的消费了消息,但是不要高兴的太早…我们来到控制台:
我K,为什么这里还是显示有一条消息,我明明已经消费了的啊,为什么还有…别急,这个时候我们再来重启一下项目:
WTF?又消费一次?这不就导致消息重复消费了吗?苍天啊…
别慌别慌,James来解答各位的疑惑,出现这种情况呢是因为我们刚才在yml配置文件将消息的确认机制改为了自动确认,而我们在消费者中并没有添加与确认相关的任何代码,我们改造一下生产者和消费者↓
生产者:RabbitProducerController.java
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ConfirmCallbackService confirmCallbackService;
@Autowired
private ReturnCallbackService returnCallbackService;
@GetMapping("/sendMes")
public String sendMes(String msg) {
/** * 确保消息发送失败后可以重新返回到队列中 * 注意:yml需要配置 publisher-returns: true */
rabbitTemplate.setMandatory(true);
/** * 消费者确认收到消息后,手动ack回执回调处理 */
rabbitTemplate.setConfirmCallback(confirmCallbackService);
/** * 消息投递到队列失败回调处理 */
rabbitTemplate.setReturnCallback(returnCallbackService);
rabbitTemplate.convertAndSend("test_fanoutExchange", "test_queue", msg, message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
},
new CorrelationData(UUID.randomUUID().toString()));
// rabbitTemplate.convertAndSend("test_fanoutExchange", "test_queue", msg);
log.info("生产消息:{}", msg);
return "生产消息:" + msg;
}
Confirm确认回调:ConfirmCallbackService.java
交换机确认收到消息后,手动ack回执回调处理 yml需要设置publisher-confirms: true
/** * 交换机确认收到消息后,手动ack回执回调处理 yml需要设置publisher-confirms: true */
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("交换机已经确认收到,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack);
} else {
log.error("消息发送异常!:{}", cause);
}
}
}
confirm方法参数详解:
- correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
- ack:消息投递到broker 的状态,true表示成功。
- cause:表示投递失败的原因。
Return确认回调:ReturnCallbackService.java
消息投递到队列失败回调处理
/** * 消息投递到队列失败回调处理 */
@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback{
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
}
}
returnedMessage方法参数详解:
- message(消息体)
- replyCode(响应code)
- replyText(响应内容)
- exchange(交换机)
- routingKey(队列)
消息消费者:
/** * 消费消息有三种回执方法,我们来分析一下每种方法的含义。 * @param msg * @param channel * @param message * @throws IOException */
@RabbitListener(queues = "test_queue")
//@RabbitHandler
public void consumer(String msg, Channel channel, Message message) throws IOException {
try {
// int i = 10/0;
log.info("消费消息:{}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.error("消息已重复处理失败,拒绝再次接收...");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
} else {
log.error("消息即将再次返回队列处理...");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
在consumer方法添加:channel(信道)、message 两个参数。
消费消息有三种回执方法,我们来分析一下每种方法的含义。
1、basicAck
basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
void basicAck(long deliveryTag, boolean multiple)
- deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
- multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
举个梨子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。
2、basicNack
basicNack :表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
- deliveryTag:表示消息投递序号。
- multiple:是否批量确认。
- requeue:值为 true 消息将重新入队列。
3、basicReject
basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。
void basicReject(long deliveryTag, boolean requeue)
- deliveryTag:表示消息投递序号。
- requeue:值为 true 消息将重新入队列。
4.1 消息确认机制的测试
好了,我们现在重启一下项目,再次生产一条消息:
可以看到,我们的消息已经正常投递到Exchange、Queue和正常消费了,我们看一下控制台:
可以看到消息也已经被删除了,ok,一切正常,完美!
5. 消息重试
yml配置文件:
修改一下消费者,手动抛出一个异常,因为消息若是被正常消费那肯定不需要重试了:
@RabbitListener(queues = "test_queue")
//@RabbitHandler
public void consumer(String msg, Channel channel, Message message) throws IOException {
try {
int i = 10/0;
log.info("消费消息:{}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.error("消息已重复处理失败,拒绝再次接收...");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
} else {
log.error("消息即将再次返回队列处理...");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
ok,重新启动,生产一条消息,通过断点我们发现,消息并没有像我们配置文件中配置的那样重试3次,这是怎么回事呢?
是这样的,在我们的消费者中可以看到我将异常给try、catch起来了,那么这样的话呢,我们的异常被自己捕获那就不算异常了,我们来修改一下消费者:
@RabbitListener(queues = "test_queue")
// @RabbitHandler
public void consumer(String msg, Channel channel, Message message) throws IOException {
int i = 10/0;
log.info("消费消息:{}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
这样我们通过断点可以发现我们的int i = 10/0;这样代码执行了3次。
PS:消息的重试也可以通过使用第三方的中间件,例如Redis、MongoDB来记录当前已经重试的次数↓
思考:即使重试了3次,可是代码还是出现异常,那怎么办?↓
@RabbitHandler
public void processHandler(String msg, Channel channel, Message message) throws IOException {
try {
int a = 1 / 0;
log.info("消费消息:{}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
出现了异常那就让消息重新回到队列里面,然后再次消费,是不是以为这是一个好办法?不…这样会死人的↓
在生产上,代码出现问题,99%是不会自动修复的,一条消息会被无限投递进队列,消费端无限执行,导致了死循环。CPU瞬间打满,生产服务器直接挂掉…
经过测试分析发现,当消息重新投递到消息队列时,这条消息不会回到队列尾部,仍是在队列头部。
消费者会立刻消费这条消息,业务处理再抛出异常,消息再重新入队,如此反复进行。导致消息队列处理出现阻塞,导致正常消息也无法运行。
这个怎么解决?
我们可以先对消息进行确认,然后再将异常的消息重新发送到队列,这样的话,这条消息就会出现在队列的尾部。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重新发送消息到队尾
// 参数:Exchange、routingKey、额外的设置属性、消息字节数组
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(),
null,
JSON.toJSONBytes(msg));
但是这样也不是办法,这条消息还是会失败…
解答:这个时候我们可以借助MySQL或Redis等将消息持久化到数据库进行人工处理…
6. 死信队列
当我们设置的消息重试次数超过了之后,会将此消息转发到死信队列!
修改我们的Queue,配置类:FanoutCongif.java
- 给队列增加x-dead-letter-exchange和x-dead-letter-routing-key参数,用于指定死信队列的路由和routingKey
- 创建死信队列
- 创建死信交换机
绑定死信交换机与死信队列的关系
@Bean
public Queue createQueue() {// 给队列增加x-dead-letter-exchange和x-dead-letter-routing-key参数,用于指定死信队列的路由和routingKey
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "test_dead_letter_Exchange");
args.put("x-dead-letter-routing-key", "test_dead_letter_.#");
Queue queue = new Queue("test_queue", true, false, false, args );
return queue;
}
// ========== dead_letter =============
@Bean
public Queue createDeadLetterQueue() {return new Queue("test_dead_letter_queue", false, false, false, null);
}
@Bean
public FanoutExchange createDeadLetterExchange() {FanoutExchange fanoutExchange = new FanoutExchange("test_dead_letter_exchange", true, false, null);
return fanoutExchange;
}
@Bean
public Binding createDeadLetterBinding() {return BindingBuilder.bind(createDeadLetterQueue()).to(createDeadLetterExchange());
}
配置MessageRecoverer对异常消息进行处理,此处理会在listener.retry次数尝试完并还是抛出异常的情况下才会调用
若不加此配置,则会默认使用:RejectAndDontRequeueRecoverer.java,实现仅仅是将异常打印抛出
// 配置MessageRecoverer对异常消息进行处理,此处理会在listener.retry次数尝试完并还是抛出异常的情况下才会调用
// 若不加此配置,则会默认使用:RejectAndDontRequeueRecoverer.java,实现仅仅是将异常打印抛出
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "test_dead_letter_exchange", "test_dead_letter_.#");
}
创建一个死信队列的消费者:
@Slf4j
@Component
@RabbitListener(queues = "test_dead_letter_queue")
public class ConsumerController {
@RabbitHandler
public void consumer(String msg, Channel channel, Message message) throws IOException {
log.info("欢迎来到死信队列!msg:{}", msg);
// TODO 由于设置了消息手动确认机制,故此,需要手动ACK,否则死信队列里面的消息会一直存在->重复消费
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
启动项目,生产一条消息,查看日志:
可以看到,消息重试了三次,还是失败后进入了死信队列…
那么在死信队列里面可以对我们的异常消息进行MySQL\Redis持久化,然后人工处理等等操作…
好啦,今天的分享就到这里啦,不知道对小伙伴们有没有帮助呢?
喜欢的同学可以
微信搜索:“每日一试” 或 “每日一面”关注公众号,每日收集大厂面试真题,每天学习新知识!
或者扫描下方二维码:
创作不易,多多关照!
还没有评论,来说两句吧...