消息队列消息丢失的原因及解决办法 RabbitMQ 和 Kafka
1. MQ一条消息的生产和消费过程
消息的丢失可能发生在Producer、Broker、Consumer 的任一阶段;
" class="reference-link">
2. RabbitMQ
2.1 生产者丢失数据原因
生产者将数据发送到 RabbitMQ 的时候,消息可能因为网络等问题在传入过程中给搞丢了。
2.1.1 生产者端解决方法
开启 RabbitMQ 事务
使用用 RabbitMQ 提供的事务功能,就是生产者**发送数据之前**开启 RabbitMQ 事务`channel.txSelect`,然后发送消 息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务`channel.txRollback`,然后重试发送消息;如果收到了消息,那么可以提交事务`channel.txCommit,类似我们数据库数据库事务机制`。
开启
confirm
模式在生产者端设置开启 `confirm` 模式之后,你每次写的消息都会**分配一个唯一的 id**,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 **`ack`** 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 **`nack`** 接口,告诉你这个消息接收失败,你可以重试。而且可以结合这个机制自己在内存里维护每个消息 id 的状态,如果**超过一定时间**还没接收到这个消息的回调,那么你可以**重发**。
事务机制和 confirm
机制最大的不同在于:
- 事务机制是同步的,提交一个事务之后会阻塞在那儿;基本上吞吐量会下来,耗性能。
confirm
机制是异步的,你发送个消息之后就可以发送下一个消息,然后消息 被RabbitMQ 接收了之后会异步回调你的一个接口,通知你这个消息接收到了。所以一般在生产者这块避免数据丢失,都是采用confirm
机制的。
2.2 RabbitMQ 弄丢了数据
如果是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。
2.2.1 开启 RabbitMQ 的持久化
设置持久化有**两个步骤**:
- 创建 queue 的时候将其设置为持久化;保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。
deliveryMode
设置为 2。即将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。
2.2.2 持久化机制与生产者的 confirm
机制配合
开启RabbitMQ 的持久化机制,也有一种可能 ,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。所以,持久化可以跟生产者那边的 `confirm` 机制配合起来,**只有消息被持久化到磁盘之后,才会通知生产者 `ack` 了**,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 `ack`,需要自己重发。
2.3 消费端丢失了数据
RabbitMQ 如果丢失了数据,主要是**因为你消费的时候,刚消费到,还没处理,结果消费进程挂了。**导致RabbitMQ 认为已经消费了数据。
2.3.1 关闭 RabbitMQ 的自动 ack
关闭 RabbitMQ 的自动 `ack`,可以通过一个 api 来调用就行,确保每次数据处理完后**手动 `ack`**。这样的话,如果你还没处理完,不就没有 `ack` 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个**消费分配给别的 consumer 去处理**,消息是不会丢的。
3. Kafka
Kafka快是因为牺牲了消息可靠换取回来的性能,在最早期版本的确没提供消息可靠的策略,经过多个版本迭代后的功能完善,已经不存在这种旧观念。可靠的关键点主要有以下:
3.1 生产者可通过设置ack:
- 0:producer不等待broker的ack,broker一接收到还没有写入磁盘就已经返回,可靠性最低;
- 1:producer等待broker的ack,partition的leader刷盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据,可靠性中;
- -1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack,可靠性高,但延迟时间长
实现上需要注意细节; 正确处理返回值或者捕捉异常,就可以保证这个阶段的消息不会丢失。
同步发送时,捕捉异常
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println(“ 消息发送成功”);
} catch (Throwable e) {
System.out.println(“ 消息发送失败”);
System.out.println(e);
}- 异步发送时,则需要在会回调方法里检查
producer.send(record, (metadata, exception) -> {
if (metadata != null) {System.out.println(" 消息发送成功");
} else {
System.out.println(" 消息发送失败");
System.out.println(exception);
}
});
3.2 消费端丢失数据原因
Kafka 消费端丢失数据同RabbitMq原因一致, 消费端拿到数据后还没处理,消费进程挂了,结果消费者那边自动提交了 offset。导致Kafka 认为已经消费了数据。
3.2.1 关闭动提交offset
- enable.auto.commit = true,不管执行结果如何,消费者会自动提交offset。
- enable.auto.commit = false,需要用户需要手动提交offset,可以根据执行结果具体处理offset
3.3 Kafka 弄丢了数据
消息在Broker端存储,如果是集群,消息会在这个阶段被复制到其他副本上,一旦Broker出现故障,比如进程挂了或服务器宕机了,就可能会丢失消息。这块比较常见的一个场景,就是 Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。但是此时其他的 follower 刚好还有一部分数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,不就丢失了一些数据?
如果对消息的可靠性要求非常高,可以通过配置Broker参数来避免宕机引起的丢消息问题
- 单个节点:将消息写入磁盘
多个节点:将消息发送到2个以上的节点
所以此时一般是要求起码设置如下 4 个参数:
- 给 topic 设置
replication.factor
参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。 - 在 Kafka 服务端设置
min.insync.replicas
参数:这个值必须大于 1,要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower ,可被选举为leader。 - 在 producer 端设置
acks=all
:这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了。 - 在 producer 端设置
retries=MAX
(无限次重试):这个是要求一旦写入失败,就无限重试。
- 给 topic 设置
还没有评论,来说两句吧...