kafka集群安装部署

迈不过友情╰ 2022-04-08 12:11 481阅读 0赞

kafka集群安装部署

  • 环境介绍
  • 安装部署
    • 安装zookeeper
  • kafka搭建
  • 遇到的问题

环境介绍

系统 : centos7
机器 : 10.5.32.1,10.5.32.2,10.5.32.3
kafka : kafka_2.11-0.11.0.1
zookeeper : kafka内置zookeeper

安装部署

先下载安装包

  1. cd /home/kafka
  2. wget https://archive.apache.org/dist/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz
  3. tar -zxvf kafka_2.11-0.11.0.1.tgz
  4. cd kafka_2.11-0.11.0.1/

其他两台机器也是如此

安装zookeeper

修改zookeeper的配置文件(三台节点都一样)

  1. vim config/zookeeper.properties
  2. # 记住这个目录,后面会用到
  3. dataDir=/usr/kafka/data/zookeeper
  4. clientPort=2181
  5. initLimit=5
  6. syncLimit=2
  7. tickTime=2000
  8. server.1=10.5.32.1:2888:3888
  9. server.2=10.5.32.2:2888:3888
  10. server.3=10.5.32.3:2888:3888

创建上面的目录,并生成相应的myid文件

  1. mkdir -p /usr/kafka/data/zookeeper
  2. # 对应上面配置的serverid
  3. echo 1 > /usr/kafka/data/zookeeper/myid

保存退出,启动即可

  1. ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties &

zookeeper搭建完成

kafka搭建

  1. vim conf/server.properties

修改以下几个属性值

  1. # 其他两台机器分别为2和3
  2. broker.id=1
  3. # 分别对应每台机器的ip或者主机名
  4. listeners=PLAINTEXT://10.5.32.1:9092
  5. # 根据需求修改为相应的地址
  6. log.dirs=/var/log/kafka-logs
  7. # 我本来是想在zookeeper中添加一个节点,把这个kafka集群创建的所有节点都放在一个节点下面,
  8. # 但没有找到方法,如果有知道的,麻烦告知,谢谢
  9. zookeeper.connect=10.5.32.1:2181,10.5.32.2:2181,10.5.32.3:2181

依次启动即可

  1. ./bin/kafka-server-start.sh -daemon ./config/server.properties &

kafka搭建完成

遇到的问题

搭建完成之后测试,发现有时会出现大量丢失数据的现象,而且都是数据并没有发到kafka上,数据丢失率在90%左右,代码如下:

  1. import org.apache.kafka.clients.producer.*;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import java.util.ArrayList;
  4. import java.util.HashSet;
  5. import java.util.List;
  6. import java.util.Properties;
  7. import java.util.concurrent.ExecutionException;
  8. import java.util.concurrent.Future;
  9. public class ProducerTest {
  10. public static void main(String[] args) throws ExecutionException, InterruptedException {
  11. String x = "";
  12. Properties properties = new Properties();
  13. properties.put("bootstrap.servers", "10.5.32.1:9092");
  14. properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  15. properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  16. Producer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  17. for (int i = 0; i < 100000; i++) {
  18. kafkaProducer.send(new ProducerRecord<>("topic-name", x + i));
  19. }
  20. kafkaProducer.flush();
  21. kafkaProducer.close();
  22. }
  23. }

后在properties中加了如下两行配置,丢失数据的情况就没有了

  1. properties.put("linger.ms", 1);
  2. properties.put("batch.size", 10);

或者改为同步发送数据,丢失数据的情况也没了,代码如下:

  1. public class ProducerTest {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. String x = "";
  4. Properties properties = new Properties();
  5. properties.put("bootstrap.servers", "10.5.32.1:9092");
  6. properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  7. properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  8. Producer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  9. for (int i = 0; i < 100000; i++) {
  10. Future<RecordMetadata> send = kafkaProducer.send(new ProducerRecord<>("topic-name", x + i));
  11. }
  12. kafkaProducer.flush();
  13. kafkaProducer.close();
  14. }
  15. }

通过callback打印exception,发现报了如下错误

  1. org.apache.kafka.common.errors.TimeoutException: Expiring 1250 record(s) for topic-name-0: 35218 ms has passed since last append
  2. org.apache.kafka.common.errors.TimeoutException: Expiring 212 record(s) for topic-name-0: 35218 ms has passed since batch creation plus linger time

现在暂时没时间去查证,后续会更新,当然如果有看到的大神,麻烦评论或私信告知,谢谢

发表评论

表情:
评论列表 (有 0 条评论,481人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Kafka部署

    *1集群部署的基本流程** 下载安装包、解压安装包、修改配置文件、分发安装包、启动集群 **2集群部署的基础环境准备** 安装前的准备工作(zk集群已经部署完毕...

    相关 部署Kafka

    前言 关于kafka的工作机制,已经在上篇博文:[Kafka原理及单机部署][Kafka]中详细写出来,这里只是将kafka的一个群集部署写了出来。 > 博文大纲:

    相关 kafka部署

    在[上篇文章][Link 1]中讲解了kafka的单点部署,本篇讲解kafka的集群部署。 1.拷贝kafka配置文件并启动 进入kafka的解压文件,输入以下两行命令

    相关 Kafka 部署

      Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动