Kafka集群搭建

以你之姓@ 2022-05-09 05:20 515阅读 0赞

一,环境准备

  1. \* Zookeeper单点/集群服务(演示单点使用单点, 演示集群使用集群)
  2. \* kafka安装包
  3. -- kafka\_2.11-1.1.0.tgz
  4. \* 自己玩, 基于虚拟机搭建

二,Kafka单点搭建

  1. 1,安装Zookeeper安装包,可以参考之前Zookeeper集群搭建,不做赘述
  2. 2,上传解压kafka安装包到指定目录
  3. tar -zxvf kafka_2.11-1.1.0.tgz -C /usr/develop/
  4. 3,修改./conf/server.properties配置文件,修改zookeeper链接地址为真实地址
  5. zookeeper.connect=192.168.91.131:2181
  6. 4,启动zookeeper服务,启动前切记关闭防火墙或者开放服务端口(zk2181kafka9092
  7. // 关闭防火墙
  8. service iptables stop
  9. // 启动zookeeper服务
  10. sh ./bin/zkServer.sh start
  11. // 查看zookeeper服务状态
  12. sh ./bin/zkServer.sh status
  13. 5kafka关联server.properties配置信息启动,&表示后台启动
  14. sh kafka-server-start.sh ../config/server.properties &

70

  1. 6,查看kafka启动状态,存在进程,说明启动成功
  2. ps -ef | grep kafka

70 1

  1. 7,添加topic,下列命令表示在zookeeper服务上添加名称为“standloneTest”的topic
  2. \* --partitions 2 :表示该话题存在两个分区,不指定默认使用(num.partitions配置信息),分区数用于限制对Consumer并行数的限制,Consumer并行数不能大于partition分区数,若超出则分配完分区后,其他Consumer轮空
  3. \* --replication-factor 1 表示每个分区存在一个副本,不指定默认使用(default.replication.factor),具体含义下一篇分析
  4. sh kafka-topics.sh --create --zookeeper 192.168.91.131:2181 --replication-factor 1 --partitions 2 --topic standloneTest
  5. 8kafka日志文件查看节点状态
  6. \* 日志文件路径,在server.properties中配置,下列演示为默认路径演示
  7. log.dirs=/tmp/kafka-logs
  8. \* 查看日志文件

70 2

  1. \* 从上图可以看出,话题standloneTest创建成功,且成功构造出两个分区-0和分区-1
  2. 9zookeeper节点查看topic信息

70 3

  1. \* zookeeper节点可以看出, 跟节点下创建了需要支撑kafka的一系列节点,
  2. \* /brokers节点下存在当前kafka节点的topic信息集合,
  3. \* 进入/brokers/topics节点,可以看到该节点上存储上的所有topic,进入创建的topic standardTest
  4. \* 进入/brokers/tipics/standardTest节点,可以看到当前topic主题的分区信息,该topic分为两个分区,则节点下有01两个分区,其中展示的01每个分区,表示该分区在该broker节点下的副本,用于集群中数据同步
  5. \* 获取分区状态信息,如下,具体分区详情信息在集群搭建中再做详解

70 4

  1. 10kafka常用命令
  2. //kafka后台启动命令
  3. sh ./bin/kafka-server-start.sh -daemon ../config/server.properties
  4. // kafka停止命令,
  5. sh ./bin/kafka-server-stop.sh
  6. // 创建topic命令
  7. sh ./bin/kafka-topics.sh --create --zookeeper 192.168.91.131:2181 --replication-factor 1 --partitions 2 --topic standloneTest
  8. // 查看topic列表
  9. sh ./bin/kafka-topics.sh --list --zookeeper 192.168.91.131:2181
  10. // 修改topic
  11. sh ./bin/kafka-topics.sh --alert --zookeeper 192.168.91.131:2181 --partitions 1 --topic myTest
  12. // 查看topic信息
  13. sh ./bin/kafka-topics.sh --describe --zookeeper 192.168.91.131:2181 --topic myTest
  14. // 删除topic
  15. sh ./bin/kafka-topics.sh --delete --zookeeper 192.168.91.131:2181 --topic myTest
  16. // 控制台生产者
  17. sh ./bin/kafka-console-producer.sh --broker-list 192.168.91.131:9092 --topic myTest
  18. // 控制台消费者
  19. sh ./bin/kafka-console-consumer.sh --zookeeper 192.168.91.131:2181 --from-beginning --topic myTest

三,Kafka集群搭建

  1. 1,环境准备
  2. \* zookeeper集群环境
  3. \* 三台虚拟机作为服务器搭载kafka
  4. 2,修改./conf/server.properties配置文件
  5. \* 修改zookeeper链接
  6. zookeeper.connect=192.168.91.128:2181,192.168.91.129:2181,192.168.91.130:2181
  7. \* 修改broker.id(默认值为0),三台broker.id不一致,
  8. broker.id=1
  9. broker.id=2
  10. broker.id=3
  11. \* 修改listeners,修改为当前IP
  12. listeners=PLAINTEXT://192.168.91.128:9092
  13. 3,逐节点启动zookeeper集群和kafka集群,链接zookeeper查询节点信息,三个broker在注册中心全部注册成功

70 5

  1. 4,基于集群模式创建topic,创建三个分区,每个分区上创建三个副本。保证每个分区的Leader副本可以分布在三台机器上,并且在另外两台机器上分别创建一个Follower副本
  2. sh kafka-topics.sh --create --zookeeper 192.168.91.129:2181,192.168.91.129:2181,192.168.91.130:2181 --replication-factor 3 --partitions 3 --topic clusterTest
  3. 5,在日志文件查看分区副本信息,可以看到该服务器下有三个分区的信息,其中一个分区的副本信息为三个副本信息的Leader副本,其余两个分区的副本信息为该分片的Follower副本;副本存在目的主要用于实现数据同步,且副本详情信息存储在zookeeper节点

70 6

  1. 6,在zookeeper客户端查看副本详情
  2. \* 查看副本信息详情命令
  3. get /brokers/topics/clusterTest/partitions/0/state

70 7

  1. \* 节点信息分析
  2. -- leader 标识当前分区的Leader副本所在的broker.id
  3. -- leader\_epoch zookeeper集群leader选举的epoch概念,标识当前Leader的版本号
  4. -- isr 当前集群下的分片副本信息集合,随着集群信息变化,该isr值动态变更
  5. -- Leader副本所在broker宕机后,会触发Leader重新选举,leader\_epoch1

四,Java连接Kafka集群进行消息处理

  1. 1,服务端发送消息
  2. public class KafkaProducerDemo extends Thread {
  3. private final KafkaProducer<Integer, String> producer;
  4. private final String topic;
  5. public KafkaProducerDemo(String topic) {
  6. Properties properties = new Properties();
  7. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
  8. "192.168.91.129:9092,192.168.91.129:9092,192.168.91.130:9092");
  9. properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerDemo");
  10. properties.put(ProducerConfig.ACKS_CONFIG, "-1");
  11. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  12. "org.apache.kafka.common.serialization.IntegerSerializer");
  13. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  14. "org.apache.kafka.common.serialization.StringSerializer");
  15. producer = new KafkaProducer<Integer, String>(properties);
  16. this.topic = topic;
  17. }
  18. @Override
  19. public void run() {
  20. for (int i = 0; i < 10; i++) {
  21. String message = "producer message : " + i;
  22. System.out.println(message);
  23. producer.send(new ProducerRecord<Integer, String>(topic, message));
  24. try {
  25. Thread.sleep(1000);
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. }
  31. public static void main(String[] args) {
  32. KafkaProducerDemo producerDemo = new KafkaProducerDemo("clusterTest");
  33. producerDemo.start();
  34. }
  35. }

70 8

  1. 2,客户端消费消息
  2. public class KafkaConsumerDemo extends Thread {
  3. private final KafkaConsumer<String, Integer> kafkaConsumer;
  4. public KafkaConsumerDemo(String topic) {
  5. Properties properties = new Properties();
  6. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
  7. "192.168.91.129:9092,192.168.91.129:9092,192.168.91.130:9092");
  8. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerDemo");
  9. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
  10. properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
  11. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
  12. "org.apache.kafka.common.serialization.IntegerDeserializer");
  13. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
  14. "org.apache.kafka.common.serialization.StringDeserializer");
  15. // 从第一个顺序读取
  16. properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  17. kafkaConsumer = new KafkaConsumer<String, Integer>(properties);
  18. kafkaConsumer.subscribe(Collections.singleton(topic));
  19. }
  20. @Override
  21. public void run() {
  22. while (true) {
  23. ConsumerRecords<String, Integer> consumerRecords = kafkaConsumer.poll(1000);
  24. for (ConsumerRecord record : consumerRecords) {
  25. System.out.println("consumer receive : " + record.value());
  26. }
  27. }
  28. }
  29. public static void main(String[] args) {
  30. KafkaConsumerDemo consumerDemo = new KafkaConsumerDemo("clusterTest");
  31. consumerDemo.start();
  32. }
  33. }

70 9

发表评论

表情:
评论列表 (有 0 条评论,515人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Kafka

    Kafka使用背景 在我们大量使用分布式数据库、分布式计算集群的时候,是否会遇到这样一些问题: 我想分析一下用户行为(pageviews),以便我能设计出更

    相关 Kafka

    Kafka初识 1、Kafka使用背景 在我们大量使用分布式数据库、分布式计算集群的时候,是否会遇到这样的一些问题: 1. 我们想分析下用户行为(pageviews

    相关 Kafka

    前言:kafka作为一个消息中间件,由linkedin使用scala编写,用作LinkedIn的活动流,和运营数据处理管道的基础,其特点在于具有高水平扩展也就是动态扩容和高吞吐

    相关 Kafka

    一,环境准备           \ Zookeeper单点/集群服务(演示单点使用单点, 演示集群使用集群)           \ kafka安装包        

    相关 Kafka

    Kafka集群搭建 1、软件环境 1、linux一台或多台,大于等于2 2、已经搭建好的zookeeper集群(参考我上一篇zk集群搭建:[https://blo

    相关 Kafka

    1. 集群部署的基本流程 下载安装包、解压安装包、修改配置文件、分发安装包、启动集群 2.集群部署的基础环境准备 **安装前的准备工作(zk集群已经部署完毕)**...