消息队列消息丢失的原因及解决办法 RabbitMQ 和 Kafka

た 入场券 2022-12-16 05:57 1153阅读 0赞

1. MQ一条消息的生产和消费过程

消息的丢失可能发生在Producer、Broker、Consumer 的任一阶段;

" class="reference-link">watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3UwMTA4OTk5ODU_size_16_color_FFFFFF_t_70

2. RabbitMQ

2.1 生产者丢失数据原因

  1. 生产者将数据发送到 RabbitMQ 的时候,消息可能因为网络等问题在传入过程中给搞丢了。

2.1.1 生产者端解决方法

  • 开启 RabbitMQ 事务

    1. 使用用 RabbitMQ 提供的事务功能,就是生产者**发送数据之前**开启 RabbitMQ 事务`channel.txSelect`,然后发送消 息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务`channel.txRollback`,然后重试发送消息;如果收到了消息,那么可以提交事务`channel.txCommit,类似我们数据库数据库事务机制`
  • 开启 confirm 模式

    1. 在生产者端设置开启 `confirm` 模式之后,你每次写的消息都会**分配一个唯一的 id**,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 **`ack`** 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 **`nack`** 接口,告诉你这个消息接收失败,你可以重试。而且可以结合这个机制自己在内存里维护每个消息 id 的状态,如果**超过一定时间**还没接收到这个消息的回调,那么你可以**重发**。

事务机制和 confirm 机制最大的不同在于:

  1. 事务机制是同步的,提交一个事务之后会阻塞在那儿;基本上吞吐量会下来,耗性能。
  2. confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后消息 被RabbitMQ 接收了之后会异步回调你的一个接口,通知你这个消息接收到了。所以一般在生产者这块避免数据丢失,都是采用 confirm 机制的。

2.2 RabbitMQ 弄丢了数据

如果是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。

2.2.1 开启 RabbitMQ 的持久化

  1. 设置持久化有**两个步骤**:
  1. 创建 queue 的时候将其设置为持久化;保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。
  2. deliveryMode 设置为 2。即将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。

必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。

2.2.2 持久化机制与生产者的 confirm 机制配合

  1. 开启RabbitMQ 的持久化机制,也有一种可能 ,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。所以,持久化可以跟生产者那边的 `confirm` 机制配合起来,**只有消息被持久化到磁盘之后,才会通知生产者 `ack` 了**,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 `ack`,需要自己重发。

2.3 消费端丢失了数据

  1. RabbitMQ 如果丢失了数据,主要是**因为你消费的时候,刚消费到,还没处理,结果消费进程挂了。**导致RabbitMQ 认为已经消费了数据。

2.3.1 关闭 RabbitMQ 的自动 ack

  1. 关闭 RabbitMQ 的自动 `ack`,可以通过一个 api 来调用就行,确保每次数据处理完后**手动 `ack`**。这样的话,如果你还没处理完,不就没有 `ack` 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个**消费分配给别的 consumer 去处理**,消息是不会丢的。

3. Kafka

Kafka快是因为牺牲了消息可靠换取回来的性能,在最早期版本的确没提供消息可靠的策略,经过多个版本迭代后的功能完善,已经不存在这种旧观念。可靠的关键点主要有以下:

3.1 生产者可通过设置ack

  • 0:producer不等待broker的ack,broker一接收到还没有写入磁盘就已经返回,可靠性最低;
  • 1:producer等待broker的ack,partition的leader刷盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据,可靠性中;
  • -1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack,可靠性高,但延迟时间长

实现上需要注意细节; 正确处理返回值或者捕捉异常,就可以保证这个阶段的消息不会丢失。

  1. 同步发送时,捕捉异常

    try {
    RecordMetadata metadata = producer.send(record).get();
    System.out.println(“ 消息发送成功”);
    } catch (Throwable e) {
    System.out.println(“ 消息发送失败”);
    System.out.println(e);
    }

    1. 异步发送时,则需要在会回调方法里检查

    producer.send(record, (metadata, exception) -> {
    if (metadata != null) {

    1. System.out.println(" 消息发送成功");

    } else {

    1. System.out.println(" 消息发送失败");
    2. System.out.println(exception);

    }
    });

3.2 消费端丢失数据原因

Kafka 消费端丢失数据同RabbitMq原因一致, 消费端拿到数据后还没处理,消费进程挂了结果消费者那边自动提交了 offset。导致Kafka 认为已经消费了数据。

3.2.1 关闭动提交offset

  • enable.auto.commit = true,不管执行结果如何,消费者会自动提交offset。
  • enable.auto.commit = false,需要用户需要手动提交offset,可以根据执行结果具体处理offset

3.3 Kafka 弄丢了数据

消息在Broker端存储,如果是集群,消息会在这个阶段被复制到其他副本上,一旦Broker出现故障,比如进程挂了或服务器宕机了,就可能会丢失消息。这块比较常见的一个场景,就是 Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。但是此时其他的 follower 刚好还有一部分数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,不就丢失了一些数据?

如果对消息的可靠性要求非常高,可以通过配置Broker参数来避免宕机引起的丢消息问题

  1. 单个节点:将消息写入磁盘
  2. 多个节点:将消息发送到2个以上的节点

    所以此时一般是要求起码设置如下 4 个参数:

    • 给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。
    • 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower ,可被选举为leader。
    • 在 producer 端设置 acks=all:这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了
    • 在 producer 端设置 retries=MAX(无限次重试):这个是要求一旦写入失败,就无限重试

发表评论

表情:
评论列表 (有 0 条评论,1153人围观)

还没有评论,来说两句吧...

相关阅读

    相关 消息队列任务丢失原因

    有时:一个异步处理的操作正常的放到了异步队列里,但是并没有被处理,或者数据库用改动,但是消费的binlog日志并没有改动信息,造成这种的原因是什么?对此进行一定猜测,消息队列是