Kafka学习笔记

ゝ一纸荒年。 2024-03-24 17:05 123阅读 0赞

Kafka学习笔记

概念

Kafka是一个分布式的基于发布/订阅模式的消息队列,主要用于大数据实时处理领域。

发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。

消息队列的两种模式

点对点模式

消费者主动拉取数据,消息收到后清楚消息;一个消息只能被消费一次。
在这里插入图片描述

发布/订阅模式

  1. 可以有多个topic主题(浏览、点赞、收藏、评论等)
  2. 消费者消费数据后,不删除数据;数据可以被重复消费
  3. 每个消费者相互独立,都可以消费到数据
    在这里插入图片描述

Kafka整体架构

  1. 为了方便扩展,提高吞吐量,一个topic分为多个partition
  2. 配合分区的概念,提出了消费组的概念,组内每个消费者并行消费;每个消费者只能消费一个分区的数据
  3. 为了提高可用性,为每个partition增加若干副本
  4. ZK中记录谁是leader,2.8以后可以不配置ZK

由于数据可能非常多,kafka将topic分为多个partition;一个topic可以存储在多个分区中。
在这里插入图片描述
在这里插入图片描述

1个topic存储在多个分区中,那么消费者也可以并行消费;为每个分区指定一个消费者消费数据
在这里插入图片描述

为了提高分区的可靠性,引入了副本,副本分为leader和follower,不管是生产者还是消费者,都是对leader中的数据进行处理;当leader出现故障,就用follower进行替代。
在这里插入图片描述
zooker记录集群中机器上下线的信息,还会记录每个分区中谁是leader副本。
在这里插入图片描述

windows下Kafka安装启动

下载地址:kafka官网下载在这里插入图片描述
各个目录的内容:

  • bin:启动脚本,里面还有一个windows目录,其中放置的是windows下的启动命令;
  • config:配置文件;
  • libs:依赖的jar;
  • logs:日志;
  • site-docs:文档

启动服务

在windows目录下,打开cmd输入下面的命令,启动zookeeper服务

  1. zookeeper-server-start.bat ..\..\config\zookeeper.properties

启动kafka服务

  1. kafka-server-start.bat ..\..\config\server.properties

启动消费者,指定topic的名称为first

  1. kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic first --from-beginning

启动生产者,指定topic的名称为first

  1. kafka-console-producer.bat --broker-list localhost:9092 --topic first

Kafka生产者

发送消息原理

在消息发送的过程中,涉及到了两个线程:main线程和sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断地从RecordAccumulator中拉取消息发送到Kafka Broker。

生产者发送消息流程

  1. 生产者调用send方法发送数据
  2. 数据经过拦截器处理,一般不调用此拦截器
  3. 序列化器:将数据序列化
  4. 分区器:决定将数据发送到哪个分区;实际是分区器是一个双端队列;
  5. 发送数据需要满足以下两个条件:

    • batch.size:数据累计达到batch.size后,sender才会发送数据,默认16k
    • linger.ms:如果数据没有达到batch.size,那么sender等待linger.ms设置的时间到了之后就会发送数据
  6. 集群收到发送的数据后,会进行应答

    • 0:生产者发送过来的数据,不需要等待数据落盘应答
    • 1:生产者发送过来的数据,Leader收到数据后应答;
    • -1:生成者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答,-1和all等价;
  7. 发送数据失败后会进行重试,默认重试次数是整数的最大值;

在这里插入图片描述

生产者的重要参数

数据发送的两种方式

数据发送分为同步发送和异步发送。异步发送分为带回调的异步发送和不带回调的异步发送。

同步发送
异步发送

生产者发送消息分区策略

分区策略指的是消息发送到哪个分区的策略。
Kafka中的分区策略可以通过DefaultPartitioner这个类中的方法进行指定。

  1. 如果指定了partition,则把数据写入指定的分区
  2. 没有指定partition的值但是有key,则将key的hash值与topic的partition数进行取余得到partition值
  3. 既没有指定partition值又没有key值的情况下,kafka采用Sticky Partition(粘性分区器)策略,遂选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机选择一个和上次不一样的分区进行使用

生产环境中,可以使用表名作为key,让指定类型的数据都发送到同一个分区中。

自定义分区

如果有需求的话,可以自定义分区。只要实现Partitioner接口,然后重写partition()方法
在这里插入图片描述

生产经验

如何提高吞吐量
数据可靠性

数据可靠性和前面提到的ACK应答级别有关。

当ACK应答级别为0时,生产者发送过来的数据,不需要等到数据持久化到磁盘中就可以进行应答,
问题: 如果Leader发生故障,此时数据没有持久化,而已经进行了应答,那么会丢失数据
在这里插入图片描述
当ACK应答级别为1时,生产者发送过来的数据,Leader收到数据后应答;
问题:应答完成后,没有把数据同步到副本,然后Leader挂了,这时候会通过选举选出新的Leader,但是新的Leader中没有数据,生产者也不会重新发送消息,因为已经收到应答了
在这里插入图片描述
当应答ACK级别为-1时,生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答;
问题:如果某个Follwer出现故障,导致不能同步数据。
Kafka的解决方案是:如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。这样就不用等长期联系人不上或者已经故障的节点。
在这里插入图片描述
ISR队列:Leader维护了一个动态的in-sync replica set (ISR),意为和Leader保持同步的Follower+Leader集合(Leader:0,isr:0, 1, 2)

如果分区副本设置为1个,或者ISR里应答的最小副本数量(min.insync.replicas,默认为1)设置为,和ack=1的效果相同,任然有丢数的风险(leader:0,isr:0)。
数据完全可靠条件=ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

数据可靠性总结

  • acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
  • acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
  • acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follower应答,可靠性高,效率低;
    在生产环境中,acks=0很少使用,一般用于传输普通日志,允许丢失个别数据;acks=-1一般用于对可靠性要求比较高的场景,比如钱等;
数据去重
数据传递语义
  • 至少一次 = ACK级别设置为-1 + 分区副本数量>=2 + ISR里应答的最小副本数量>=2
  • 最多一次 = ACK级别设置0
  • 总结

    • 至少一次可以保证数据不丢失,但是不能保证数据不重复
    • 最多一次可以保证数据不重复,但是不能保证数据不丢失
  • 精确一次:对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不能丢失;
数据重复场景

当acks=-1时,生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答;当Leader出现故障,而其它Follower还没收到数据,那么生产者会继续发送重复的数据
在这里插入图片描述
数据重复的解决方案:幂等性;开启参数enable.idempotence,默认为true,false为关闭。

幂等性:Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条数据;
精确一次 = 幂等性+至少一次(ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2)
重复消息的判断标准:具有PID,Partition,SeqNumber相同主键的消息提交时;Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition表示分区号;SequenceNumber是单调自增的。所以幂等性只能保证在单分区单会话内消息不重复。

在这里插入图片描述

Kafka生产者事务

开启事务,必须开启幂等性;
在这里插入图片描述

数据乱序
  1. kafka在1.x版本之前保证数据单分区有序,条件如下:

    • max.in.flight.request.per.connection=1,不需要考虑是否开启幂等性;
  2. kafka在1.x及以后的版本中保证数据单分区有序,条件如下:

    • 未开启幂等性:max.in.flight.request.per.connection需要设置为1;
    • 开启幂等性:max.in.flight.requests.per.connection需要设置小于等于5,因为在kafka 1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,所以无论如何,都可以保证最近5个request的数据有序
      在这里插入图片描述

Kafka Broker

#

broker重要参数






























































































参数名称 描述
replica.lag.time.max.ms ISR 中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值,默认 30s。
auto.leader.rebalance.enable 默认是 true。 自动 Leader Partition 平衡。
leader.imbalance.per.broker.percentage 默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡
leader.imbalance.check.interval.seconds 默认值 300 秒。检查 leader 负载是否平衡的间隔时间。
log.segment.bytes Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分 成块的大小,默认值 1G
log.index.interval.bytes 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引
log.retention.hours Kafka 中数据保存的时间,默认 7 天
log.retention.minutes Kafka 中数据保存的时间,分钟级别,默认关闭
log.retention.ms Kafka 中数据保存的时间,毫秒级别,默认关闭
log.retention.check.interval.ms 检查数据是否保存超时的间隔,默认是 5 分钟
log.retention.bytes 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的 segment。
log.cleanup.policy 默认是 delete,表示所有数据启用删除策略;如果设置值为 compact,表示所有数据启用压缩策略。
num.io.threads 默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。
num.replica.fetchers 副本拉取线程数,这个参数占总核数的 50%的 1/3
num.network.threads 默认是 3。数据传输线程数,这个参数占总核数的50%的 2/3 。
log.flush.interval.messages 强制页缓存刷写到磁盘的条数,默认是 long 的最
大值,9223372036854775807。一般不建议修改,
交给系统自己管理。
log.flush.interval.ms 每隔多久,刷数据到磁盘,默认是 null。一般不建
议修改,交给系统自己管理

kafka副本

副本基本信息
Leader选举流程
Leader和Follower故障处理
分区副本分配

kafka消费者

Kafka消费方式

  • pull模式:consumer采用从Broker中拉取数据。kafka采用这种方式。缺点是如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。
  • push模式:由于由broker决定消息发送速率,很难适应所有消费者的消费速率。

消费者工作流程

  1. 每个分区的数据只能由消费组中一个消费者消费;即同一个消费组中的消费者,不能消费同一个分区的数据;但是一个消费者可以消费多个分区数据;
  2. kafka中使用系统主题记录每个消费者消费了多少消息,具体是每个消费者把offset提交到系统主题中保存;
    在这里插入图片描述
消费者组

消费者组:Consumer Group,由多个consumer组成,形成一个消费者组的条件是所有消费者的groupid相同;

  1. 消费者组内每个消费者消费不同分区的数据,一个分区只能由一个组内消费者消费;
  2. 消费者组之间互不影响;所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者;
    在这里插入图片描述
消费者组初始化流程

coordinator:辅助实现消费者组的初始化和分区的分配;
- coordinator节点选择 = groupid的hash值 % 50 (_consumer_offset的分区数量)

  1. 每个消费者都发送JoinGroup请求到coordinator中;
  2. coordinator选择一个consumer作为leader;
  3. 把要消费的topic情况,发送给leader消费者;
  4. leader负责指定消费方案;
  5. 把消费方案发给coordinator;
  6. coordinator把消费方案发送给各个consumer;
  7. 每个消费者都会和coordinator保持心跳(默认3s),一旦超时(session.timeout.ms=45s),该消费者会被移除,并触发再平衡;或者消费者处理消息的时间过长(max.poll.interval.ms=5分钟),也会触发再平衡;

在这里插入图片描述

分区的分配以及再平衡

一个consumer group由多个consumer组成,一个topic由多个partition组成,那么由哪个conumser消费哪个partition的数据

kafka有四种主流的分区分配策略:Range、RoundRobin、Sticky、CooperativeSticky。可以通过配置参数partition.assignment.strategy,修改分区的分配策略,默认策略是Range+CooperativeSticky。Kafka可以同时使用多个分区分配策略

Range分区策略

在这里插入图片描述
在这里插入图片描述

RoundRobin 分区策略原理

在这里插入图片描述
在这里插入图片描述

消费者消费流程
  1. consumer创建ConsumerNetworkClient;
  2. consumer发送消费请求(sendFetches);

    • Fetch.min.bytes:每批次最小抓取大小,默认1字节;
    • fetch.max.wait.ms:一批数据最小值未达到的超时时间,默认500ms;
    • Fetch.max.bytes:每批次最大抓取大小,默认50ms;
  3. kafka集群收到请求,通过回调函数将消息放到队列中;
  4. consumer从队列中拉取数据

    • Max.poll.records:一次拉取数据返回消息的最大条数,默认500条;
  5. 对数据进行反序列化,经过拦截器处理,最后处理数据;

在这里插入图片描述

参考–

  • kafka视频

发表评论

表情:
评论列表 (有 0 条评论,123人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Kafka学习笔记

    Kafka学习笔记 概念 Kafka是一个分布式的基于发布/订阅模式的消息队列,主要用于大数据实时处理领域。 发布/订阅:消息的发布者不会将消息直接发送给特定的订

    相关 Kafka学习笔记

    简介   消息队列在平时开发时,只要涉及到高并发,解耦,异步处理等,最好的方式就是引入消息队列,Kafka 是一个分布式的基于发布/订阅模式的消息队列,其具有高吞吐,可恢