RocketMQ源码分析:Producer中延迟故障机制

我不是女神ヾ 2024-03-26 22:06 207阅读 0赞

前言

RocketMQ中的延迟故障机制是为了帮助Producer能够通过消息发送延迟或者消息发送结果主动感知Broker忙碌或者故障,消息发送延迟或者消息发送失败时可以将Broker排除在选择列表之外。这个机制默认是不开启的,如果需要开启这个机制需要在创建Producer时主动开启。

消息延迟故障机制使用

消息延迟故障机制需要在创建Producer时,通过producer.setSendLatencyFaultEnable(true)主动开启才能够生效。

  1. DefaultMQProducer producer = new DefaultMQProducer();
  2. producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
  3. producer.setSendLatencyFaultEnable(true);
  4. 复制代码

消息延迟故障机制生效时机

这个机制只有在producer发送消息时,由Producer自动选择MessageQueue才会生效,如果发送消息时指定了要发送的MessageQueue或者指定了MessageQueueSelector,即使开启了这个配置,也不会生效。

消息延迟故障机制源码分析

RocketMQ消息延迟故障机制的源码在org/apache/rocketmq/client/latency包下,在这个包下只有一个接口LatencyFaultTolerance,以及两个类LatencyFaultToleranceImpl,MQFaultStrategy,下面我们来分析下源码。

LatencyFaultTolerance源码分析

LatencyFaultTolerance接口的作用是提供broker延迟信息记录的方法。它提供了broker延迟信息更新,查询,删除的方法以及根据延迟信息选择延迟相对较小的broker。

  1. public interface LatencyFaultTolerance<T> {
  2. // 更新延迟信息
  3. void updateFaultItem(final T name/*brokerName*/, final long currentLatency/*当前延迟*/, final long notAvailableDuration/*不可用周期*/);
  4. // 查询broker是否可用
  5. boolean isAvailable(final T name/*brokerName*/);
  6. // 删除延迟信息
  7. void remove(final T name/*brokerName*/);
  8. // 随机选择一个broker
  9. T pickOneAtLeast();
  10. }
  11. 复制代码

LatencyFaultToleranceImpl源码分析

通过名字就可以看出LatencyFaultToleranceImpl是LatencyFaultTolerance的实现类。它包含了两个成员变量

  • faultItemTable

faultItemTable缓存了broker和延迟信息的关系。它是一个key是brokername,value是记录broker延迟信息的对象,

  • randomItem

ThreadLocalIndex内部包含一个T

发表评论

表情:
评论列表 (有 0 条评论,207人围观)

还没有评论,来说两句吧...

相关阅读