【kafka源码】kafka分区副本的分配规则

快来打我* 2024-04-08 13:47 194阅读 0赞

5ba102a2cc5a4f9ca157a4d3205a7b56.png

源码分析

创建Topic的源码入口 AdminManager.createTopics()

以下只列出了分区分配相关代码其他省略

  1. def createTopics(timeout: Int,
  2. validateOnly: Boolean,
  3. toCreate: Map[String, CreatableTopic],
  4. includeConfigsAndMetatadata: Map[String, CreatableTopicResult],
  5. responseCallback: Map[String, ApiError] => Unit): Unit = {
  6. // 1. map over topics creating assignment and calling zookeeper
  7. val brokers = metadataCache.getAliveBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) }
  8. val metadata = toCreate.values.map(topic =>
  9. try {
  10. val assignments = if (topic.assignments().isEmpty) {
  11. AdminUtils.assignReplicasToBrokers(
  12. brokers, resolvedNumPartitions, resolvedReplicationFactor)
  13. } else {
  14. val assignments = new mutable.HashMap[Int, Seq[Int]]
  15. // Note: we don't check that replicaAssignment contains unknown brokers - unlike in add-partitions case,
  16. // this follows the existing logic in TopicCommand
  17. topic.assignments.asScala.foreach {
  18. case assignment => assignments(assignment.partitionIndex()) =
  19. assignment.brokerIds().asScala.map(a => a: Int)
  20. }
  21. assignments
  22. }
  23. trace(s"Assignments for topic $topic are $assignments ")
  24. }
  25. 复制代码
  1. 以上有两种方式,一种是我们没有指定分区分配的情况也就是没有使用参数--replica-assignment;一种是自己指定了分区分配

1. 自己指定了分区分配规则

从源码中得知, 会把我们指定的规则进行了包装,注意它并没有去检查你指定的Broker是否存在;

2. 自动分配 AdminUtils.assignReplicasToBrokers

format_png

  1. 参数检查: 分区数>0; 副本数>0; 副本数<=Broker数 (如果自己未定义会直接使用Broker中个配置)
  2. 根据是否有 机架信息来进行不同方式的分配;
  3. 要么整个集群都有机架信息,要么整个集群都没有机架信息; 否则抛出异常

副本分配的几个原则:

  1. 将副本平均分布在所有的 Broker 上;
  2. partition 的多个副本应该分配在不同的 Broker 上;
  3. 如果所有的 Broker 有机架信息的话, partition 的副本应该分配到不同的机架上。

无机架方式分配

AdminUtils.assignReplicasToBrokersRackUnaware

  1. /**
  2. * 副本分配时,有三个原则:
  3. * 1. 将副本平均分布在所有的 Broker 上;
  4. * 2. partition 的多个副本应该分配在不同的 Broker 上;
  5. * 3. 如果所有的 Broker 有机架信息的话, partition 的副本应该分配到不同的机架上。
  6. *
  7. * 为实现上面的目标,在没有机架感知的情况下,应该按照下面两个原则分配 replica:
  8. * 1. 从 broker.list 随机选择一个 Broker,使用 round-robin 算法分配每个 partition 的第一个副本;
  9. * 2. 对于这个 partition 的其他副本,逐渐增加 Broker.id 来选择 replica 的分配。
  10. */
  11. private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
  12. replicationFactor: Int,
  13. brokerList: Seq[Int],
  14. fixedSta

发表评论

表情:
评论列表 (有 0 条评论,194人围观)

还没有评论,来说两句吧...

相关阅读

    相关 kafka关于理解

    前言 副本机制是许多存储引擎必备的,在数据存储时候,为了保证数据的高可靠性,常常需要将主节点数据进行备份存储,即保存一份与主节点相同的数据集,一旦主节点发生宕机等故障,通

    相关 kafka分区分配策略

    1 Range 在 Kafka内部存在两种默认的分区分配策略:Range和 RoundRobin。 Range是默认策略。Range是对每个Topic而言的(即一个Top