RabbitMQ消息持久化、消息确认、消息重试、死信队列实战!

野性酷女 2022-10-22 07:47 109阅读 0赞

不知各位小伙伴是否听闻过:MQ消息持久化、消息的确认机制、消息的重试机制和死信队列这几个名词啊,如果没听说过的小伙伴可就要去好好补补课啦!
微信搜索:“每日一试” 或 “每日一面”关注公众号,每日收集大厂面试真题,每天学习新知识!
或者扫描下方二维码:
在这里插入图片描述

既然听说过,那你们可知这些又是如何实现的呢?
闲话不多说,接下来咱们进行实战!

1. 配置环境:

①安装Erlang环境包
②RabbitMQ官网:http://www.rabbitmq.com/
下载RabbitMQ软件包并安装,此文就不再一一赘述了

ps:若是需要以上详细安装步骤的可关注公众号留言

2. 编写SpringBoot测试模块

①新建SpringBoot项目
②pom.xml添加依赖

  1. <!-- rabbitMQ -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
  6. <!-- web -->
  7. <dependency>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter-web</artifactId>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.springframework.boot</groupId>
  13. <artifactId>spring-boot-starter-actuator</artifactId>
  14. </dependency>

③yml配置文件
在这里插入图片描述

④编写启动类

  1. @SpringBootApplication
  2. public class ApiApplication {
  3. public static void main(String[] args) {
  4. SpringApplication.run(ApiApplication.class, args);
  5. }
  6. }

3. 消息的持久化

3.1配置类:FanoutCongif.java

创建一个队列:

  1. @Bean
  2. public Queue createQueue() {
  3. Queue queue = new Queue("test_queue", true, false, false, null);
  4. return queue;
  5. }

Queue构造方法有五个参数

  • 第一个参数:队列名称
  • 第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失,此参数默认是true,即默认持久化
  • 第三个参数:是否队列私有化,false则表示所有的消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用
  • 第四个参数:是否自动删除,false代表连接停掉后不会自动删除这个队列
  • 其他参数设置

创建一个交换机:

  1. @Bean
  2. public FanoutExchange createExchange() {
  3. FanoutExchange fanoutExchange = new FanoutExchange("test_fanoutExchange", true, false, null);
  4. return fanoutExchange;
  5. }

Exchange构造方法参数与Queue一样。

声明交换机与队列的绑定关系:

  1. @Bean
  2. public Binding createBinding() {
  3. return BindingBuilder.bind(createQueue()).to(createExchange());
  4. }

启动项目,会自动帮我们创建交换机以及队列

交换机、队列持久化:即MQ服务器关闭或者重启之后,交换机依然存在,队列以及队列中未曾被消费的数据依然存在

3.1.1 持久化验证

①通过RabbitMQ的控制台:localhost:15672登录查看
在这里插入图片描述
在这里插入图片描述
②进入到RabbitMQ的sbin目录,使用cmd命令:使用rabbitmqctl.bat list_queues命令查看队列以及队列的消息数量
在这里插入图片描述

3.2 消息生产者及消费者

消息生产者:RabbitProducerController.java

  1. @Autowired
  2. private RabbitTemplate rabbitTemplate;
  3. @GetMapping("/sendMes")
  4. public String sendMes(String msg) {
  5. rabbitTemplate.convertAndSend("test_fanoutExchange", "test_queue", msg, message -> {
  6. message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  7. return message;
  8. },
  9. new CorrelationData(UUID.randomUUID().toString()));
  10. // rabbitTemplate.convertAndSend("test_fanoutExchange", "test_queue", msg);// 使用此代码也可发送消息
  11. log.info("生产消息:{}", msg);
  12. return "生产消息:" + msg;
  13. }

消息消费者:

  1. @RabbitListener(queues = "test_queue")
  2. // @RabbitHandler
  3. public void consumer(String msg, Channel channel, Message message) throws IOException {
  4. log.info("消费消息:{}", msg);
  5. }
3.2.1 测试:

使用浏览器或postman发送消息,可以看到后台输出日志,表名我们的消息已经生产并且被消费了
在这里插入图片描述
MQ可视化后台查看:可以看到,队列中没有消息,因为已经消费完了
在这里插入图片描述

ps:消息消费完了之后自动删除,是因为RabbitMQ默认是使用的自动确认机制,可以改为手动确认机制↓

4. 消息确认机制

我们知道RabbitMQ的消息确认机制有

  • 生产者—>Exchange(生产者)
  • Exchange—>Queue(生产者)
  • 消费者ACK(manual:手动确认 auto:自动确认(默认) none:不确认)

在yml配置文件添加代码:
在这里插入图片描述
好,此时我们重启项目,再次生产一条消息:
在这里插入图片描述
可以看到我们的消费者依然是正常的消费了消息,但是不要高兴的太早…我们来到控制台:
在这里插入图片描述
我K,为什么这里还是显示有一条消息,我明明已经消费了的啊,为什么还有…别急,这个时候我们再来重启一下项目:
在这里插入图片描述
WTF?又消费一次?这不就导致消息重复消费了吗?苍天啊…

别慌别慌,James来解答各位的疑惑,出现这种情况呢是因为我们刚才在yml配置文件将消息的确认机制改为了自动确认,而我们在消费者中并没有添加与确认相关的任何代码,我们改造一下生产者和消费者↓
生产者:RabbitProducerController.java

  1. @Autowired
  2. private RabbitTemplate rabbitTemplate;
  3. @Autowired
  4. private ConfirmCallbackService confirmCallbackService;
  5. @Autowired
  6. private ReturnCallbackService returnCallbackService;
  7. @GetMapping("/sendMes")
  8. public String sendMes(String msg) {
  9. /** * 确保消息发送失败后可以重新返回到队列中 * 注意:yml需要配置 publisher-returns: true */
  10. rabbitTemplate.setMandatory(true);
  11. /** * 消费者确认收到消息后,手动ack回执回调处理 */
  12. rabbitTemplate.setConfirmCallback(confirmCallbackService);
  13. /** * 消息投递到队列失败回调处理 */
  14. rabbitTemplate.setReturnCallback(returnCallbackService);
  15. rabbitTemplate.convertAndSend("test_fanoutExchange", "test_queue", msg, message -> {
  16. message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  17. return message;
  18. },
  19. new CorrelationData(UUID.randomUUID().toString()));
  20. // rabbitTemplate.convertAndSend("test_fanoutExchange", "test_queue", msg);
  21. log.info("生产消息:{}", msg);
  22. return "生产消息:" + msg;
  23. }

Confirm确认回调:ConfirmCallbackService.java
交换机确认收到消息后,手动ack回执回调处理 yml需要设置publisher-confirms: true

  1. /** * 交换机确认收到消息后,手动ack回执回调处理 yml需要设置publisher-confirms: true */
  2. @Slf4j
  3. @Component
  4. public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
  5. @Override
  6. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  7. if (ack) {
  8. log.info("交换机已经确认收到,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack);
  9. } else {
  10. log.error("消息发送异常!:{}", cause);
  11. }
  12. }
  13. }

confirm方法参数详解:

  • correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
  • ack:消息投递到broker 的状态,true表示成功。
  • cause:表示投递失败的原因。

Return确认回调:ReturnCallbackService.java
消息投递到队列失败回调处理

  1. /** * 消息投递到队列失败回调处理 */
  2. @Slf4j
  3. @Component
  4. public class ReturnCallbackService implements RabbitTemplate.ReturnCallback{
  5. @Override
  6. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  7. log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
  8. }
  9. }

returnedMessage方法参数详解:

  • message(消息体)
  • replyCode(响应code)
  • replyText(响应内容)
  • exchange(交换机)
  • routingKey(队列)

消息消费者:

  1. /** * 消费消息有三种回执方法,我们来分析一下每种方法的含义。 * @param msg * @param channel * @param message * @throws IOException */
  2. @RabbitListener(queues = "test_queue")
  3. //@RabbitHandler
  4. public void consumer(String msg, Channel channel, Message message) throws IOException {
  5. try {
  6. // int i = 10/0;
  7. log.info("消费消息:{}", msg);
  8. channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
  9. } catch (Exception e) {
  10. if (message.getMessageProperties().getRedelivered()) {
  11. log.error("消息已重复处理失败,拒绝再次接收...");
  12. channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
  13. } else {
  14. log.error("消息即将再次返回队列处理...");
  15. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  16. }
  17. }
  18. }

在consumer方法添加:channel(信道)、message 两个参数。

消费消息有三种回执方法,我们来分析一下每种方法的含义。

1、basicAck
basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。

  1. void basicAck(long deliveryTag, boolean multiple)
  • deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
  • multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。

举个梨子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。

2、basicNack
basicNack :表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

  1. void basicNack(long deliveryTag, boolean multiple, boolean requeue)
  • deliveryTag:表示消息投递序号。
  • multiple:是否批量确认。
  • requeue:值为 true 消息将重新入队列。

3、basicReject
basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

  1. void basicReject(long deliveryTag, boolean requeue)
  • deliveryTag:表示消息投递序号。
  • requeue:值为 true 消息将重新入队列。
4.1 消息确认机制的测试

好了,我们现在重启一下项目,再次生产一条消息:
在这里插入图片描述
可以看到,我们的消息已经正常投递到Exchange、Queue和正常消费了,我们看一下控制台:
在这里插入图片描述
可以看到消息也已经被删除了,ok,一切正常,完美!

5. 消息重试

yml配置文件:
在这里插入图片描述
修改一下消费者,手动抛出一个异常,因为消息若是被正常消费那肯定不需要重试了:

  1. @RabbitListener(queues = "test_queue")
  2. //@RabbitHandler
  3. public void consumer(String msg, Channel channel, Message message) throws IOException {
  4. try {
  5. int i = 10/0;
  6. log.info("消费消息:{}", msg);
  7. channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
  8. } catch (Exception e) {
  9. if (message.getMessageProperties().getRedelivered()) {
  10. log.error("消息已重复处理失败,拒绝再次接收...");
  11. channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
  12. } else {
  13. log.error("消息即将再次返回队列处理...");
  14. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  15. }
  16. }
  17. }

ok,重新启动,生产一条消息,通过断点我们发现,消息并没有像我们配置文件中配置的那样重试3次,这是怎么回事呢?

是这样的,在我们的消费者中可以看到我将异常给try、catch起来了,那么这样的话呢,我们的异常被自己捕获那就不算异常了,我们来修改一下消费者:

  1. @RabbitListener(queues = "test_queue")
  2. // @RabbitHandler
  3. public void consumer(String msg, Channel channel, Message message) throws IOException {
  4. int i = 10/0;
  5. log.info("消费消息:{}", msg);
  6. channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
  7. }

这样我们通过断点可以发现我们的int i = 10/0;这样代码执行了3次。

PS:消息的重试也可以通过使用第三方的中间件,例如Redis、MongoDB来记录当前已经重试的次数↓

思考:即使重试了3次,可是代码还是出现异常,那怎么办?↓

  1. @RabbitHandler
  2. public void processHandler(String msg, Channel channel, Message message) throws IOException {
  3. try {
  4. int a = 1 / 0;
  5. log.info("消费消息:{}", msg);
  6. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  7. } catch (Exception e) {
  8. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  9. }
  10. }

出现了异常那就让消息重新回到队列里面,然后再次消费,是不是以为这是一个好办法?不…这样会死人的↓

在生产上,代码出现问题,99%是不会自动修复的,一条消息会被无限投递进队列,消费端无限执行,导致了死循环。CPU瞬间打满,生产服务器直接挂掉…

经过测试分析发现,当消息重新投递到消息队列时,这条消息不会回到队列尾部,仍是在队列头部。

消费者会立刻消费这条消息,业务处理再抛出异常,消息再重新入队,如此反复进行。导致消息队列处理出现阻塞,导致正常消息也无法运行。

这个怎么解决?

我们可以先对消息进行确认,然后再将异常的消息重新发送到队列,这样的话,这条消息就会出现在队列的尾部。

  1. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  2. // 重新发送消息到队尾
  3. // 参数:Exchange、routingKey、额外的设置属性、消息字节数组
  4. channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
  5. message.getMessageProperties().getReceivedRoutingKey(),
  6. null,
  7. JSON.toJSONBytes(msg));

但是这样也不是办法,这条消息还是会失败…

解答:这个时候我们可以借助MySQL或Redis等将消息持久化到数据库进行人工处理…

6. 死信队列

当我们设置的消息重试次数超过了之后,会将此消息转发到死信队列!
修改我们的Queue,配置类:FanoutCongif.java

  • 给队列增加x-dead-letter-exchange和x-dead-letter-routing-key参数,用于指定死信队列的路由和routingKey
  • 创建死信队列
  • 创建死信交换机
  • 绑定死信交换机与死信队列的关系

    @Bean
    public Queue createQueue() {

    1. // 给队列增加x-dead-letter-exchange和x-dead-letter-routing-key参数,用于指定死信队列的路由和routingKey
    2. Map<String, Object> args = new HashMap<String, Object>();
    3. args.put("x-dead-letter-exchange", "test_dead_letter_Exchange");
    4. args.put("x-dead-letter-routing-key", "test_dead_letter_.#");
    5. Queue queue = new Queue("test_queue", true, false, false, args );
    6. return queue;

    }

    // ========== dead_letter =============
    @Bean
    public Queue createDeadLetterQueue() {

    1. return new Queue("test_dead_letter_queue", false, false, false, null);

    }

    @Bean
    public FanoutExchange createDeadLetterExchange() {

    1. FanoutExchange fanoutExchange = new FanoutExchange("test_dead_letter_exchange", true, false, null);
    2. return fanoutExchange;

    }

    @Bean
    public Binding createDeadLetterBinding() {

    1. return BindingBuilder.bind(createDeadLetterQueue()).to(createDeadLetterExchange());

    }

配置MessageRecoverer对异常消息进行处理,此处理会在listener.retry次数尝试完并还是抛出异常的情况下才会调用
若不加此配置,则会默认使用:RejectAndDontRequeueRecoverer.java,实现仅仅是将异常打印抛出

  1. // 配置MessageRecoverer对异常消息进行处理,此处理会在listener.retry次数尝试完并还是抛出异常的情况下才会调用
  2. // 若不加此配置,则会默认使用:RejectAndDontRequeueRecoverer.java,实现仅仅是将异常打印抛出
  3. @Bean
  4. public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
  5. return new RepublishMessageRecoverer(rabbitTemplate, "test_dead_letter_exchange", "test_dead_letter_.#");
  6. }

创建一个死信队列的消费者:

  1. @Slf4j
  2. @Component
  3. @RabbitListener(queues = "test_dead_letter_queue")
  4. public class ConsumerController {
  5. @RabbitHandler
  6. public void consumer(String msg, Channel channel, Message message) throws IOException {
  7. log.info("欢迎来到死信队列!msg:{}", msg);
  8. // TODO 由于设置了消息手动确认机制,故此,需要手动ACK,否则死信队列里面的消息会一直存在->重复消费
  9. // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  10. }
  11. }

启动项目,生产一条消息,查看日志:
在这里插入图片描述

可以看到,消息重试了三次,还是失败后进入了死信队列…

那么在死信队列里面可以对我们的异常消息进行MySQL\Redis持久化,然后人工处理等等操作…

好啦,今天的分享就到这里啦,不知道对小伙伴们有没有帮助呢?
喜欢的同学可以
微信搜索:“每日一试” 或 “每日一面”关注公众号,每日收集大厂面试真题,每天学习新知识!
或者扫描下方二维码:
在这里插入图片描述

创作不易,多多关照!

发表评论

表情:
评论列表 (有 0 条评论,109人围观)

还没有评论,来说两句吧...

相关阅读