Kafka安装详细教程

阳光穿透心脏的1/2处 2023-02-13 05:42 242阅读 0赞

一、安装Zookeeper

本文以 zookeeper-3.4.10 版本

1、下载

下载地址:https://zookeeper.apache.org/releases.html

2、解压

解压到 F:\tool 目录

3、修改配置

找到解压后 F:/tool/zookeeper-3.4.10/config/zoo_sample.cfg 文件

复制一份重命名为zoo.cfg (默认的名称是这个),

  1. #修改配置
  2. dataDir=f:/zookeeper/data
  3. #新增配置
  4. dataLogDir=f:/zookeeper/log

zoo.cfg文件说明:

①tickTime:维持leader与follower以及客户端之间的心跳频率时间(单位毫秒,默认2000ms)

②initLimit:允许follower从节点追随leader主节点时的初始延迟,默认是10次,也就是允许的延迟时间为2000*10=20秒。如果在这个时间内还没有连接上的话,leader就会认为follower有问题,将会抛弃你不跟你玩了。

③syncLimit:默认是5.leader与follower之间的协作允许最大延迟时长,tickTime * syncLimit=2000*5=10秒。10秒之内没响应,也会认为follower存在问题。

④dataDir:用于存储持久化文件(日志、快照之类的)的路径。默认 /tmp/zookeeper 最好不要用/tmp这类的目录,一般改为/var,var在Linux中是存放临时文件的。

⑤cliientPort:客户端连接的端口号。默认是2181

⑥maxClientCnxns:最大的客户端连接数。也就是最多允许多少个客户端连接本台机器的zk

⑦配置集群中所有的节点信息:3888端口用于在没有leader时所有节点在3888建立socket连接进行投票选举出leader;2888端口是follower从节点连接leader主节点的端口。

  1. server.1=localhost:2887:3887
  2. server.2=localhost:2888:3888
  3. server.3=localhost:2889:3889

在zk中,leader可以理解成是“谦让”出来的,1,2,3,4中谁是最大的谁就立马是leader

过半通过:一共4个节点有 4/2+1=3台通过就行。比如1,2,3启动了之后3就是leader,因为遵循的是过半通过原则。

4、添加系统环境变量:

  1. 在系统变量中添加ZOOKEEPER\_HOME = D:\\Java\\Tool\\zookeeper-3.4.10
  2. 编辑path系统变量,添加为路径%ZOOKEEPER\_HOME%\\bin

5、启动Zookeeper

使用 Dos 输入命令 :zkServer

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2JhYnlsb3Zld2Vp_size_16_color_FFFFFF_t_70

二、安装kafka

1、下载

Kafka官网下载安装包 http://kafka.apache.org/downloads.html

我们下载第二种(已经被编译过的),将安装包存在在 /software/ 下

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2JhYnlsb3Zld2Vp_size_16_color_FFFFFF_t_70 1

2、解压

解压到 F:/tool 目录

3、修改配置

配置文件目录:F:/tool/kafka_2.13-2.4.1/config

修改配置文件:F:/tool/kafka_2.13-2.4.1/config/server.properties

找到以下配置项修改(事先讲好文件夹:F:/tool/kafka/log)

  1. log.dirs=F:/tool/kafka/log
  2. zookeeper.connect=localhost:2181

配置文件说明

server.properties

  1. #broker的全局唯一编号,不能重复
  2. broker.id=0
  3. #用来监听链接的端口,producer或consumer将在此端口建立连接
  4. port=9092
  5. #处理网络请求的线程数量
  6. num.network.threads=3
  7. #用来处理磁盘IO的线程数量
  8. num.io.threads=8
  9. #发送套接字的缓冲区大小
  10. socket.send.buffer.bytes=102400
  11. #接受套接字的缓冲区大小
  12. socket.receive.buffer.bytes=102400
  13. #请求套接字的缓冲区大小
  14. socket.request.max.bytes=104857600
  15. #kafka消息存放的路径
  16. log.dirs=/home/servers-kafka/logs/kafka
  17. #topic在当前broker上的分片个数
  18. num.partitions=2
  19. #用来恢复和清理data下数据的线程数量
  20. num.recovery.threads.per.data.dir=1
  21. #segment文件保留的最长时间,超时将被删除
  22. log.retention.hours=168
  23. #滚动生成新的segment文件的最大时间
  24. log.roll.hours=168
  25. #日志文件中每个segment的大小,默认为1G
  26. log.segment.bytes=1073741824
  27. #周期性检查文件大小的时间
  28. log.retention.check.interval.ms=300000
  29. #日志清理是否打开
  30. log.cleaner.enable=true
  31. #broker需要使用zookeeper保存meta数据
  32. zookeeper.connect=hadoop02:2181,hadoop03:2181,hadoop04:2181
  33. #zookeeper链接超时时间
  34. zookeeper.connection.timeout.ms=6000
  35. #partion buffer中,消息的条数达到阈值,将触发flush到磁盘
  36. log.flush.interval.messages=10000
  37. #消息buffer的时间,达到阈值,将触发flush到磁盘
  38. log.flush.interval.ms=3000
  39. #删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
  40. delete.topic.enable=true
  41. #此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producerconnection to localhost:9092 unsuccessful 错误!
  42. host.name=hadoop02

consumer.properties

  1. # zookeeper连接服务器地址
  2. zookeeper.connect=hadoop02:2181,hadoop03:2181,hadoop04:2181
  3. # zookeeper的session过期时间,默认5000ms,用于检测消费者是否挂掉
  4. zookeeper.session.timeout.ms=5000
  5. #当消费者挂掉,其他消费者要等该指定时间才能检查到并且触发重新负载均衡
  6. zookeeper.connection.timeout.ms=10000
  7. # 指定多久消费者更新offset到zookeeper中。注意offset更新时基于time而不是每次获得的消息。一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息
  8. zookeeper.sync.time.ms=2000
  9. #指定消费组
  10. group.id=xxx
  11. # 当consumer消费一定量的消息之后,将会自动向zookeeper提交offset信息
  12. # 注意offset信息并不是每消费一次消息就向zk提交一次,而是现在本地保存(内存),并定期提交,默认为true
  13. auto.commit.enable=true
  14. # 自动更新时间。默认60 * 1000
  15. auto.commit.interval.ms=1000
  16. # 当前consumer的标识,可以设定,也可以有系统生成,主要用来跟踪消息消费情况,便于观察
  17. conusmer.id=xxx
  18. # 消费者客户端编号,用于区分不同客户端,默认客户端程序自动产生
  19. client.id=xxxx
  20. # 最大取多少块缓存到消费者(默认10)
  21. queued.max.message.chunks=50
  22. # 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新 的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册"Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点, 此值用于控制,注册节点的重试次数.
  23. rebalance.max.retries=5
  24. # 获取消息的最大尺寸,broker不会像consumer输出大于此值的消息chunk 每次feth将得到多条消息,此值为总大小,提升此值,将会消耗更多的consumer端内存
  25. fetch.min.bytes=6553600
  26. # 当消息的尺寸不足时,server阻塞的时间,如果超时,消息将立即发送给consumer
  27. fetch.wait.max.ms=5000
  28. socket.receive.buffer.bytes=655360
  29. # 如果zookeeper没有offset值或offset值超出范围。那么就给个初始的offset。有smallest、largest、anything可选,分别表示给当前最小的offset、当前最大的offset、抛异常。默认largest
  30. auto.offset.reset=smallest
  31. # 指定序列化处理类
  32. derializer.class=kafka.serializer.DefaultDecoder

4、启动kafka

注意:先zookeeper在启动,再启动kafka

进入kafka目录:F:\tool\kafka_2.13-2.4.1

运行命令:

  1. .\bin\windows\kafka-server-start.bat .\config\server.properties

可能会报错:“找不到或无法加载主类 Files\java\jdk-9.0.1\lib;C:\Program”

解决(3)的办法:

  1. kafka安装目录中找到bin\\windows目录中的kafka-run-class.bat为%CLASSPATH%加上双引号(可用Matlab打开,并进行搜索)
  2. 修改前:setCOMMAND=%JAVA%%KAFKA\_HEAP\_OPTS% %KAFKA\_JVM\_PERFORMANCE\_OPTS% %KAFKA\_JMX\_OPTS%%KAFKA\_LOG4J\_OPTS% -cp%CLASSPATH% %KAFKA\_OPTS% %\*
  3. 修改后:SetCOMMAND=%JAVA%%KAFKA\_HEAP\_OPTS% %KAFKA\_JVM\_PERFORMANCE\_OPTS% %KAFKA\_JMX\_OPTS%%KAFKA\_LOG4J\_OPTS% -cp"%CLASSPATH%"%KAFKA\_OPTS% %\*

修改之后重新启动

启动成功!

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2JhYnlsb3Zld2Vp_size_16_color_FFFFFF_t_70 2

发表评论

表情:
评论列表 (有 0 条评论,242人围观)

还没有评论,来说两句吧...

相关阅读