windows下kafka安装启动以及使用

深藏阁楼爱情的钟 2021-09-21 06:42 646阅读 0赞

Zookeeper

kafka用到了zookeeper,现在的kafka会自带zookeeper,如果我们要自己安装的话 也可以

zookeeper的下载地址为:https://zookeeper.apache.org/releases.html#download

下载成功之后解压,j进入bin目录,然后通过下面的命令启动zookeeper.

  1. zkServer.cmd

在zookeeper使用时,可能会遇到一些错误,关于这些错误的问题可以参考博主的这篇博客:zookeeper安装启动的一些问题

Kafka

然后我们需要需要下载kafka,kafka的下载地址为:http://kafka.apache.org/downloads

解压并进入Kafka目录,笔者:D:\Kafka\kafka_2.12-0.11.0.0

进入config目录找到文件server.properties并打开

找到并编辑zookeeper.connect=localhost:2181

Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181

进入Kafka安装目录D:\Kafka\kafka_2.12-0.11.0.0,按下Shift+右键,选择“打开命令窗口”选项,打开命令行,输入:

  1. bin\windows\kafka-server-start.bat config\server.properties

kafka有自带的zookeeper,启动自带的zookeeper可以通过如下命令:

  1. bin\windows\zookeeper-server-start.bat config\zookeeper.properties

如果在执行该命令时报错:

  1. The syntax of the command is incorrect.

则有可能是kafka安装路径中的横杠导致的,这时候我们将kafka拷贝到一个没有横杠的路径下即可。参考:https://stackoverflow.com/questions/20105735/error-the-syntax-of-the-command-is-incorrect-when-renaming-a-file

如果安装没有问题了,使用命令启动单点集群的kafka时,报错:

  1. Connection to node 0 (host.docker.internal/10.130.228.45:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient

博主解决的办法是打开,在kafka安装路径下的config/server.properties文件中,修改下面的配置为:

  1. listeners=PLAINTEXT://localhost:9092

参考:https://stackoverflow.com/questions/47677549/kafka-zookeeper-connection-to-node-1-could-not-be-established-broker-may-no

创建一个topic

命令为:

  1. bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

测试该topic是否创建成功:

  1. bin\windows\kafka-topics.sh --list --bootstrap-server localhost:9092

发送message

  1. > bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
  2. This is a message
  3. This is another message

接收message

  1. > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  2. This is a message
  3. This is another message

启动kafka集群

在本地拷贝两个server.properties

  1. > copy config\server.properties config\server-1.properties
  2. > copy config\server.properties config/server-2.properties

更改下面的参数

  1. config/server-1.properties:
  2. broker.id=1
  3. listeners=PLAINTEXT://XXX:9093
  4. log.dirs=/tmp/kafka-logs-1
  5. config/server-2.properties:
  6. broker.id=2
  7. listeners=PLAINTEXT://XXX:9094
  8. log.dirs=/tmp/kafka-logs-2

broker.id属性是集群中每个节点的唯一且永久的名称。 我们只需要覆盖端口和日志目录,这是因为我们都在同一台计算机上运行它们,并且希望所有代理都不要试图在同一端口上注册或覆盖彼此的数据。

我们有Zookeeper并启动了单个节点,因此我们只需要启动两个新节点:

  1. > bin\kafka-server-start.bat config\server-1.properties
  2. ...
  3. > bin\kafka-server-start.bat config\server-2.properties
  4. ...

现在,创建一个具有三个复制factor的新topic:

  1. bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic

好的,但是现在有了集群,我们如何知道哪个broker在做什么?我们需要运行“describe topics”命令:

  1. bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
  2. Topic: my-replicated-topic PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
  3. Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0

第一行给出了所有分区的摘要,每一行都给出了有关一个分区的信息。 由于该主题只有一个分区,因此只有一行。

  • “leader”是负责给定分区的所有读取和写入的节点。 每个节点将成为分区的随机选择部分的领导者。
  • “replics”是为该分区复制日志的节点列表,无论它们是引导者还是当前处于活动状态。
  • “ isr”是“同步”副本的集合。 这是副本列表的子集,当前仍处于活动状态并追随领导者。

请注意,在我的示例中,节点2是topic唯一分区的leader。我们可以在创建的之前的topic 上运行相同的命令,以查看其信息

  1. kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic test
  2. Topic: test PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
  3. Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

因此,如我们期待的一样-之前的topic没有replicas 副本,并且位于服务器0上,这是我们创建群集时集群中唯一的服务器。

让我们向我们的新topic发布一些消息:

  1. kafka-console-producer.bat --bootstrap-server localhost:9092 --topic my-replicated-topic
  2. >my test message 1
  3. >my test message 2
  4. >Terminate batch job (Y/N)? Y

然后consume这些message

  1. kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-replicated-topic --from-beginning
  2. my test message 1
  3. my test message 2
  4. Processed a total of 2 messages
  5. Terminate batch job (Y/N)? Y

现在让我们测试一下容错能力。 broker 2扮演的是leader的角色,所以让我们kill它:

  1. wmic process where "caption = 'java.exe' and commandline like '%server-2.properties%'" get processid
  2. ProcessId
  3. 8760
  4. C:\Users\tools\kafka>taskkill /pid 8760 /f
  5. SUCCESS: The process with PID 8760 has been terminated.

在kill之后,leader自动变为了server 1

  1. kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
  2. Topic: my-replicated-topic PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
  3. Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 2,1,0 Isr: 1,0

但是,即使最初进行写操作的leader已经down掉了,message仍然可供使用:

  1. kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-replicated-topic --from-beginning
  2. my test message 1
  3. my test message 2

原因在于,我们设置了备份。

发表评论

表情:
评论列表 (有 0 条评论,646人围观)

还没有评论,来说两句吧...

相关阅读