RabbitMQ--SpringBoot--消息确认/备份交换机/死信队列/延迟队列/重试

分手后的思念是犯贱 2022-09-12 03:50 345阅读 0赞

原文网址:RabbitMQ—SpringBoot—消息确认/备份交换机/死信队列/延迟队列/重试_IT利刃出鞘的博客-CSDN博客

消息确认

生产者消息确认

简介

发送消息确认:用来确认消息从 producer发送到 broker 然后broker 的 exchange 到 queue过程中,消息是否成功投递。
如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出;如果是镜像队列,所有镜像接受成功后发确认消息。

流程

  1. 如果消息没有到达exchange,则confirm回调,ack=false
  2. 如果消息到达exchange,则confirm回调,ack=true
  3. exchange到queue成功,则不回调return
  4. exchange到queue失败,则回调return(需设置mandatory=true,否则不会回调,这样消息就丢了)

配置方法

application.yml

  1. # 发送者开启 confirm 确认机制
  2. spring.rabbitmq.publisher-confirms=true
  3. # 发送者开启 return 确认机制
  4. spring.rabbitmq.publisher-returns=true

ConfirmCallback

ConfirmCallback:消息只要被 RabbitMQ broker 接收到就会触发confirm方法。

  1. @Slf4j
  2. @Component
  3. public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
  4. @Override
  5. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  6. if (!ack) {
  7. log.error("confirm==>发送到broker失败\r\n" +
  8. "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
  9. correlationData, ack, cause);
  10. } else {
  11. log.info("confirm==>发送到broker成功\r\n" +
  12. "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
  13. correlationData, ack, cause);
  14. }
  15. }
  16. }

correlationData:对象内部有id (消息的唯一性)和Message。
(若ack为false,则Message不为null,可将Message数据 重新投递;若ack是true,则correlationData为null)
ack:消息投递到exchange 的状态,true表示成功。
cause:表示投递失败的原因。 (若ack为false,则cause不为null;若ack是true,则cause为null)

给每一条信息添加一个dataId,放在CorrelationData,这样在RabbitConfirmCallback返回失败时可以知道哪个消息失败。

  1. public void send(String dataId, String exchangeName, String rountingKey, String message){
  2. CorrelationData correlationData = new CorrelationData();
  3. correlationData.setId(dataId);
  4. rabbitTemplate.convertAndSend(exchangeName, rountingKey, message, correlationData);
  5. }
  6. public String receive(String queueName){
  7. return String.valueOf(rabbitTemplate.receiveAndConvert(queueName));
  8. }

2.1版本开始,CorrelationData对象具有ListenableFuture,可用于获取结果,而不是在rabbitTemplate上使用ConfirmCallback。

  1. CorrelationData cd1 = new CorrelationData();
  2. this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
  3. assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());

ReturnCallback

ReturnCallback:如果消息未能投递到目标 queue 里将触发returnedMessage方法。
若向 queue 投递消息未成功,可记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。

注意:需要rabbitTemplate.setMandatory(true);

当mandatory设置为true时,若exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者。当mandatory设置为false时,出现上述情况broker会直接将消息丢弃。

  1. @Slf4j
  2. @Component
  3. public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
  4. @Override
  5. public void returnedMessage(Message message, int replyCode, String replyText,
  6. String exchange, String routingKey) {
  7. log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" +
  8. "replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}",
  9. message, replyCode, replyText, exchange, routingKey);
  10. }
  11. }

message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。

注册ConfirmCallback和ReturnCallback

整合后的写法

  1. package com.example.config;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  5. import org.springframework.amqp.rabbit.connection.CorrelationData;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.context.annotation.Configuration;
  10. import javax.annotation.PostConstruct;
  11. @Slf4j
  12. @Configuration
  13. public class RabbitCallbackConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
  14. @Bean
  15. RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  16. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  17. rabbitTemplate.setMandatory(true);
  18. rabbitTemplate.setConfirmCallback(this);
  19. rabbitTemplate.setReturnCallback(this);
  20. return rabbitTemplate;
  21. }
  22. // 下边这样写也可以
  23. // @Autowired
  24. // private RabbitTemplate rabbitTemplate;
  25. // @PostConstruct
  26. // public void init() {
  27. // rabbitTemplate.setMandatory(true);
  28. // rabbitTemplate.setReturnCallback(this);
  29. // rabbitTemplate.setConfirmCallback(this);
  30. // }
  31. @Override
  32. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  33. if (!ack) {
  34. log.error("confirm==>发送到broker失败\r\n" +
  35. "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
  36. correlationData, ack, cause);
  37. } else {
  38. log.info("confirm==>发送到broker成功\r\n" +
  39. "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
  40. correlationData, ack, cause);
  41. }
  42. }
  43. @Override
  44. public void returnedMessage(Message message, int replyCode, String replyText,
  45. String exchange, String routingKey) {
  46. log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" +
  47. "replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}",
  48. message, replyCode, replyText, exchange, routingKey);
  49. }
  50. }

消费者消息确认

简介
























确认方式 简介 详述
auto(默认 根据消息消费的情况,智能判定

若消费者抛出异常,则mq不会收到确认消息,mq会一直此消息发出去。

若消费者没有抛出异常,则mq会收到确认消息,mq不会再次将此消息发出去。

若消费者在消费时所在服务挂了,mq不会再次将此消息发出去。

none mq发出消息后直接确认消息
manual 消费端手动确认消息

        消费者调用 ack、nack、reject 几种方法进行确认,可以在业务失败后进行一些操作,如果消息未被 ACK 则消息还会存在于MQ,mq会一直将此消息发出去。。

        如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限。

只要消息没有被消费者确认(包括没有自动确认),会导致消息一直被失败消费,死循环导致消耗大量资源。正确的处理方式是:发生异常,将消息记录到db,再通过补偿机制来补偿消息,或者记录消息的重复次数,进行重试,超过几次后再放到db中。

消息确认三种方式配置方法

spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual

手动确认三种方式(basicAck,basicNack,basicReject**)**

basicAck //表示成功确认,使用此回执方法后,消息会被RabbitMQ broker 删除。

函数原型:void basicAck(long deliveryTag, boolean multiple)

deliveryTag:消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
示例: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。

实例:

  1. @RabbitHandler
  2. public void process(String content, Channel channel, Message message){
  3. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  4. }

basicNack //表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

函数原型:void basicNack(long deliveryTag, boolean multiple, boolean requeue)

deliveryTag:表示消息投递序号。
multiple:是否批量确认。
requeue:值为 true 消息将重新入队列。

basicReject //拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

函数原型:void basicReject(long deliveryTag, boolean requeue)

deliveryTag:表示消息投递序号。
requeue:值为 true 消息将重新入队列。

备份交换机

简介

生产者发送消息,如果路由错误不能到达指定队列,就路由到备胎队列消费,这样做可以保证未被路由的消息不会丢失。其实保证消息不会丢失还可以通过消息的回调方法,添加ReturnListener的编程逻辑,但是这样做生产者的代码会复杂写,所以我们使用备份交换器实现。

实例

application.yml

  1. server:
  2. # port: 9100
  3. port: 9101
  4. spring:
  5. application:
  6. # name: demo-rabbitmq-sender
  7. name: demo-rabbitmq-receiver
  8. rabbitmq:
  9. host: localhost
  10. port: 5672
  11. username: admin
  12. password: 123456
  13. # virtualHost: /
  14. publisher-confirms: true
  15. publisher-returns: true
  16. # listener:
  17. # simple:
  18. # acknowledge-mode: manual
  19. # direct:
  20. # acknowledge-mode: manual

MQ配置

  1. package com.example.config;
  2. import org.springframework.amqp.core.AmqpAdmin;
  3. import org.springframework.amqp.core.Binding;
  4. import org.springframework.amqp.core.ExchangeBuilder;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. @Configuration
  10. public class RabbitRouterConfig {
  11. public static final String QUEUE_HELLO = "Queue@hello";
  12. public static final String QUEUE_HI = "Queue@hi";
  13. public static final String QUEUE_UNROUTE = "Queue@unroute";
  14. public static final String EXCHANGE_TOPIC_WELCOME = "Exchange@topic.welcome";
  15. public static final String EXCHANGE_FANOUT_UNROUTE = "Exchange@fanout.unroute";
  16. public static final String ROUTINGKEY_HELLOS = "hello.#";
  17. @Autowired
  18. AmqpAdmin amqpAdmin;
  19. @Bean
  20. Object initBindingTest() {
  21. amqpAdmin.declareExchange(ExchangeBuilder.fanoutExchange(EXCHANGE_FANOUT_UNROUTE).durable(true).build());
  22. amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_WELCOME).durable(true)
  23. .withArgument("alternate-exchange", EXCHANGE_FANOUT_UNROUTE).build());
  24. amqpAdmin.declareQueue(new Queue(QUEUE_HI, true));
  25. amqpAdmin.declareQueue(new Queue(QUEUE_HELLO, true));
  26. amqpAdmin.declareQueue(new Queue(QUEUE_UNROUTE, true));
  27. amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE,
  28. EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, null));
  29. amqpAdmin.declareBinding(new Binding(QUEUE_UNROUTE, Binding.DestinationType.QUEUE,
  30. EXCHANGE_FANOUT_UNROUTE, "", null));
  31. return new Object();
  32. }
  33. }

控制器

  1. package com.example.controller;
  2. import com.example.config.RabbitRouterConfig;
  3. import com.example.mq.Sender;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.web.bind.annotation.PostMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. import java.time.LocalDateTime;
  8. @RestController
  9. public class HelloController {
  10. @Autowired
  11. private Sender sender;
  12. @PostMapping("/hi")
  13. public void hi() {
  14. sender.send(RabbitRouterConfig.QUEUE_HI, "hi1 message:" + LocalDateTime.now());
  15. }
  16. @PostMapping("/hello1")
  17. public void hello1() {
  18. sender.send("hello.a", "hello1 message:" + LocalDateTime.now());
  19. }
  20. @PostMapping("/hello2")
  21. public void hello2() {
  22. sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "hello.b", "hello2 message:" + LocalDateTime.now());
  23. }
  24. @PostMapping("/ae")
  25. public void aeTest() {
  26. sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "nonono", "ae message:" + LocalDateTime.now());
  27. }
  28. }

发送器

  1. package com.example.mq;
  2. import org.springframework.amqp.core.AmqpTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. import java.util.Date;
  6. @Component
  7. public class Sender {
  8. @Autowired
  9. private AmqpTemplate rabbitTemplate;
  10. public void send(String routingKey, String message) {
  11. this.rabbitTemplate.convertAndSend(routingKey, message);
  12. }
  13. public void send(String exchange, String routingKey, String message) {
  14. this.rabbitTemplate.convertAndSend(exchange, routingKey, message);
  15. }
  16. }

接收器

  1. package com.example.mq;
  2. import com.example.config.RabbitRouterConfig;
  3. import com.rabbitmq.client.Channel;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.messaging.handler.annotation.Payload;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. public class Receiver {
  10. @RabbitListener(queues = RabbitRouterConfig.QUEUE_HI)
  11. public void hi(String payload) {
  12. System.out.println ("Receiver(hi) : " + payload);
  13. }
  14. @RabbitListener(queues = RabbitRouterConfig.QUEUE_HELLO)
  15. public void hello(String hello) throws InterruptedException {
  16. System.out.println ("Receiver(hello) : " + hello);
  17. Thread.sleep(5 * 1000);
  18. System.out.println("(hello):sleep over");
  19. }
  20. @RabbitListener(queues = RabbitRouterConfig.QUEUE_UNROUTE)
  21. public void unroute(String hello) throws InterruptedException {
  22. System.out.println ("Receiver(unroute) : " + hello);
  23. Thread.sleep(5 * 1000);
  24. System.out.println("(unroute):sleep over");
  25. }
  26. // @RabbitListener(queues = RabbitRouterConfig.QUEUE_HI)
  27. // public void hiAll(@Payload String payload, Message message, Channel channel) {
  28. // System.out.println("Receiver(hi):");
  29. // System.out.println("payload:" + payload);
  30. // System.out.println("message:" + message);
  31. // System.out.println("channel:" + channel);
  32. // }
  33. // @RabbitListener(queues = RabbitRouterConfig.QUEUE_HELLO)
  34. // public void helloAll(@Payload String payload, Message message, Channel channel) {
  35. // System.out.println("Receiver(hello):");
  36. // System.out.println("payload:" + payload);
  37. // System.out.println("message:" + message);
  38. // System.out.println("channel:" + channel);
  39. // }
  40. }

测试

分别启动发送者和接收者,然后访问:Node Exporter

结果:

接收者打印:

  1. Receiver(unroute) : ae message:2020-11-23T17:47:13.198
  2. (unroute):sleep over

死信/延迟 队列

其他网址

《RabbitMQ实战指南》=> 第4章 RabbitMQ进阶

简介

默认不会超时。

实例代码

路由配置

  1. package com.example.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class RabbitRouterConfig {
  8. public static final String EXCHANGE_TOPIC_WELCOME = "Exchange@topic.welcome";
  9. public static final String EXCHANGE_FANOUT_UNROUTE = "Exchange@fanout.unroute";
  10. public static final String EXCHANGE_TOPIC_DELAY = "Exchange@topic.delay";
  11. public static final String ROUTINGKEY_HELLOS = "hello.#";
  12. public static final String ROUTINGKEY_DELAY = "delay.#";
  13. public static final String QUEUE_HELLO = "Queue@hello";
  14. public static final String QUEUE_HI = "Queue@hi";
  15. public static final String QUEUE_UNROUTE = "Queue@unroute";
  16. public static final String QUEUE_DELAY = "Queue@delay";
  17. public static final Integer TTL_QUEUE_MESSAGE = 5000;
  18. @Autowired
  19. AmqpAdmin amqpAdmin;
  20. @Bean
  21. Object initBindingTest() {
  22. amqpAdmin.declareExchange(ExchangeBuilder.fanoutExchange(EXCHANGE_FANOUT_UNROUTE).durable(true).autoDelete().build());
  23. amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_DELAY).durable(true).autoDelete().build());
  24. amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_WELCOME)
  25. .durable(true)
  26. .autoDelete()
  27. .withArgument("alternate-exchange", EXCHANGE_FANOUT_UNROUTE)
  28. .build());
  29. amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HI).build());
  30. amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HELLO)
  31. .withArgument("x-dead-letter-exchange", EXCHANGE_TOPIC_DELAY)
  32. .withArgument("x-dead-letter-routing-key", ROUTINGKEY_DELAY)
  33. .withArgument("x-message-ttl", TTL_QUEUE_MESSAGE)
  34. .build());
  35. amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_UNROUTE).build());
  36. amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_DELAY).build());
  37. amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE,
  38. EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, null));
  39. amqpAdmin.declareBinding(new Binding(QUEUE_UNROUTE, Binding.DestinationType.QUEUE,
  40. EXCHANGE_FANOUT_UNROUTE, "", null));
  41. amqpAdmin.declareBinding(new Binding(QUEUE_DELAY, Binding.DestinationType.QUEUE,
  42. EXCHANGE_TOPIC_DELAY, ROUTINGKEY_DELAY, null));
  43. return new Object();
  44. }
  45. }

控制器

  1. package com.example.controller;
  2. import com.example.config.RabbitRouterConfig;
  3. import com.example.mq.Sender;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.web.bind.annotation.PostMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. import java.time.LocalDateTime;
  8. @RestController
  9. public class HelloController {
  10. @Autowired
  11. private Sender sender;
  12. @PostMapping("/hi")
  13. public void hi() {
  14. sender.send(RabbitRouterConfig.QUEUE_HI, "hi1 message:" + LocalDateTime.now());
  15. }
  16. @PostMapping("/hello1")
  17. public void hello1() {
  18. sender.send("hello.a", "hello1 message:" + LocalDateTime.now());
  19. }
  20. @PostMapping("/hello2")
  21. public void hello2() {
  22. sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "hello.b", "hello2 message:" + LocalDateTime.now());
  23. }
  24. @PostMapping("/ae")
  25. public void aeTest() {
  26. sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "nonono", "ae message:" + LocalDateTime.now());
  27. }
  28. }

发送器

  1. package com.example.mq;
  2. import org.springframework.amqp.core.AmqpTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. import java.util.Date;
  6. @Component
  7. public class Sender {
  8. @Autowired
  9. private AmqpTemplate rabbitTemplate;
  10. public void send(String routingKey, String message) {
  11. this.rabbitTemplate.convertAndSend(routingKey, message);
  12. }
  13. public void send(String exchange, String routingKey, String message) {
  14. this.rabbitTemplate.convertAndSend(exchange, routingKey, message);
  15. }
  16. }

接收器

  1. package com.example.mq;
  2. import com.example.config.RabbitRouterConfig;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class Receiver {
  7. @RabbitListener(queues = RabbitRouterConfig.QUEUE_HI)
  8. public void hi(String payload) {
  9. System.out.println ("Receiver(hi) : " + payload);
  10. }
  11. // @RabbitListener(queues = RabbitRouterConfig.QUEUE_HELLO)
  12. // public void hello(String hello) throws InterruptedException {
  13. // System.out.println ("Receiver(hello) : " + hello);
  14. // Thread.sleep(5 * 1000);
  15. // System.out.println("(hello):sleep over");
  16. // }
  17. //
  18. // @RabbitListener(queues = RabbitRouterConfig.QUEUE_UNROUTE)
  19. // public void unroute(String hello) throws InterruptedException {
  20. // System.out.println ("Receiver(unroute) : " + hello);
  21. // Thread.sleep(5 * 1000);
  22. // System.out.println("(unroute):sleep over");
  23. // }
  24. @RabbitListener(queues = RabbitRouterConfig.QUEUE_DELAY)
  25. public void delay(String hello) throws InterruptedException {
  26. System.out.println ("Receiver(delay) : " + hello);
  27. Thread.sleep(5 * 1000);
  28. System.out.println("(delay):sleep over");
  29. }
  30. }

application.yml

  1. server:
  2. # port: 9100
  3. port: 9101
  4. spring:
  5. application:
  6. # name: demo-rabbitmq-sender
  7. name: demo-rabbitmq-receiver
  8. rabbitmq:
  9. host: localhost
  10. port: 5672
  11. username: admin
  12. password: 123456
  13. # virtualHost: /
  14. publisher-confirms: true
  15. publisher-returns: true
  16. # listener:
  17. # simple:
  18. # acknowledge-mode: manual
  19. # direct:
  20. # acknowledge-mode: manual

实例测试

分别启动发送者和接收者。

访问:Node Exporter

五秒钟后输出:

  1. Receiver(delay) : hello2 message:2020-11-27T09:30:51.548
  2. (delay):sleep over

重试

简介

默认情况下,如果消费者程序出现异常情况, Rabbitmq 会自动实现补偿机制(也就是重试机制)。

@RabbitListener底层使用AOP进行拦截,如果程序没有抛出异常,自动提交事务。 如果Aop使用异常通知拦截获取异常信息的话 , 自动实现补偿机制,该消息会一直缓存在Rabbitmq服务器端进行重放,一直重试到不抛出异常为准。

一般来说默认5s重试一次,可以修改重试策略,消费者配置(重试5次,不行就放弃):

  1. listener:
  2. simple:
  3. retry:
  4. # 开启消费者重试(默认开启)
  5. enabled: true
  6. # 最大重试次数(默认无数次)
  7. max-attempts: 5
  8. # 重试间隔次数
  9. initial-interval: 3000

长度

其他网址

rabbitmq官网:最大长度

RabbitMQ 队列消息的条数限制、队列字节长度限制、队列溢出行为方式_From Zero To Hero-CSDN博客_rabbitmq 队列数量上限
rabbitmq 限制队列长度和总字节数 | 大专栏

队列长度限制

  • 队列的最大长度可以限制为一组消息数或一组字节数(忽略消息属性和其他开销的所有消息体长度总和),或者两者兼有。
  • 默认情况下,rabbitmq中的queue的最大长度和总字节数不受限制的(仅受全局内存,磁盘阈值的影响)。
  • 对于任何给定的队列,最大长度(任一类型)可以由客户端使用队列的参数来定义,也可以在服务器中使用配置策略(policies)来定义。在策略和参数都指定最大长度的情况下,将应用两个值中的较小值。
  • 队列长度可以使用 operator policies 强制设置。
  • 在所有情况下,都使用 就绪 消息的数量;未确认的消息不计入限制。
  • rabbitmqctl list_queues 中的字段 messages_ready, message_bytes_ready 以及管理 API 展示的即为被限制的值。

默认最大队列长度限制行为

当设置了最大队列长度或大小并达到最大值时,RabbitMQ 的默认行为是从队列前面丢弃或 dead-letter 消息(即队列中最早的消息)。要修改这种行为,请使用下面描述的 overflow 设置。

队列溢出行为

使用溢出设置来配置队列溢出行为。如果 overflow 设置为 reject-publish,则最近发布的消息将被丢弃。此外,如果 发布者确认 已启用,将通过 basic.nack 消息对发布者进行拒绝通知。如果一条消息被路由到多个队列并被其中至少一个队列拒绝,该信道将通过 basic.nack 通知发布者。该消息仍将被发布到可以将其排队的所有其他队列。

使用配置定义最大队列长度

要使用配置指定最大长度,请将关键词 max-length 和 / 或 max-length-bytes 添加到配置定义中。例如:


















type value
rabbitmqctl rabbitmqctl set_policy my-pol “^one-meg$” \ 
‘{“max-length-bytes”:1048576}’ \ 
—apply-to queues
rabbitmqctl on Windows rabbitmqctl.bat set_policy my-pol “^one-meg$” ^
“{““max-length-bytes””:1048576}” ^
—apply-to queues

my-pol 策略确保 one-meg 队列包含不超过 1MB 的消息数据。当达到1mB的限制时,最早的消息将从队列头中丢弃。

要定义溢出行为-是从头上删除消息还是拒绝新发布,需要将关键词 overflow 添加到策略定义中。例如:












rabbitmqctl rabbitmqctl set_policy my-pol “^two-messages$” \
  ’{“max-length”:2,”overflow”:”reject-publish”}’ \
  —apply-to queues
rabbitmqctl on Windows rabbitmqctl.bat set_policy my-pol “^two-messages$” ^
  “{“”max-length””:2,””overflow””:””reject-publish””}” ^
  —apply-to queues

my-pol 策略确保 two-messages 队列包含的消息不超过 2 条,并且所有其他发布都是基本发送的。只要队列包含 2 条消息并且发布者确认启用的情况下,其他发送的消息都会得到 basic.nack 响应。

策略配置也可以通过管理插件定义。详细请看 相关文档

在声明队列期间使用 x-arguments 定义最大队列长度

1)为队列声明参数 x-max-length 提供一个非负整数值来设置最大消息条数。

2)声明参数 x-max-length-bytes 提供一个非负整数值,设置最大字节长度。如果设置了两个参数,那么两个参数都将适用;无论先达到哪个限制,都将强制执行。

3)溢出行为可以通过向队列声明参数 x-overflow 提供字符串值来设置。可能的值是:
drop-head (默认值):从队列前面丢弃或 dead-letter 消息,保存后n条消息
reject-publish:最近发布的消息将被丢弃,即保存前n条消息。

下面 Java 中的这个示例声明了一个最大长度为10条消息的队列:

  1. //创建队列
  2. HashMap<String, Object> map = new HashMap<>();
  3. //设置队列最大的条数 10条
  4. map.put("x-max-length",10 );
  5. //设置队列溢出方式 保留前10条
  6. map.put("x-overflow","reject-publish" );
  7. channel.queueDeclare(queueName,false,false,false,map);

MessageConvert

其他网址

RabbitMQ:@RabbitListener 与 @RabbitHandler 及 消息序列化 - 简书

简介

  1. RabbitMQ 的序列化是指 Message 的 body 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter默认)、Jackson2JsonMessageConverter 等。
  2. 当调用了 convertAndSend 方法时会使用 MessageConvert 进行消息的序列化。
  3. SimpleMessageConverter 对于要发送的消息体 body 为 byte[] 时不进行处理;若是 String 则转成字节数组;若是 Java 对象,则使用 jdk 序列化将消息转成字节数组,转出来的结果较大,含class类名,类相应方法等信息。因此性能较差。
  4. 当使用 RabbitMQ 作为中间件时,数据量比较大,此时就要考虑使用类似 Jackson2JsonMessageConverter 等序列化形式以此提高性能。当然,也可以发送端手动将数据转为JSON格式,接收端序列化为指定对象。

使用 JSON 序列化与反序列化

RabbitMQ 提供了 Jackson2JsonMessageConverter 来支持消息内容 JSON 序列化与反序列化。被序列化对象应提供一个无参的构造函数,否则会抛出异常。

消息发送者:设置 MessageConverter 为 Jackson2JsonMessageConverter:

  1. rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

消息消费者:配置 MessageConverter 为 Jackson2JsonMessageConverter:

  1. @Configuration
  2. public class RabbitMQConfig {
  3. @Bean
  4. public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(
  5. ConnectionFactory connectionFactory){
  6. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  7. factory.setConnectionFactory(connectionFactory);
  8. factory.setMessageConverter(new Jackson2JsonMessageConverter());
  9. return factory;
  10. }
  11. }

自定义序列化

发表评论

表情:
评论列表 (有 0 条评论,345人围观)

还没有评论,来说两句吧...

相关阅读