【Kafka】Docker安装kafka&java kafka api

Dear 丶 2023-10-12 15:12 244阅读 0赞

内容目录

    • 一、安装zookeeper
      • 1 拉取镜像
      • 2 创建network
      • 3 启动容器
    • 二、安装kafka
      • 1 拉取kafka镜像
      • 2 启动kafka容器
      • 3 创建topic
      • 4 创建生产者
      • 5 创建消费者
    • 三、kafka的java api
      • 1 producer
      • 2 消费者

docker依赖于zookeeper,首先安装zookeeper

一、安装zookeeper

1 拉取镜像

在这里插入图片描述

2 创建network

在启动之前,先指定一个网络

  1. docker network create app-tier --driver bridge

3 启动容器

启动zookeeper容器

  1. docker run -d --name zookeeper-server
  2. --network app-tier
  3. -p 2181:2181
  4. -e ALLOW_ANONYMOUS_LOGIN=yes
  5. bitnami/zookeeper:latest

测试是否成功
进入zookeeper

  1. docker exec -it zookeeper-server /bin/sh

执行代码

  1. zkCli.sh -server 10.249.53.1

二、安装kafka

1 拉取kafka镜像

在这里插入图片描述

2 启动kafka容器

  1. docker run -d --name kafka \
  2. -p 9092:9092 \
  3. -e KAFKA_BROKER_ID=0 \
  4. -e ALLOW_PLAINTEXT_LISTENER=yes \
  5. -e KAFKA_ZOOKEEPER_CONNECT=10.249.53.1:2181 \
  6. -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.249.53.1:9092 \
  7. -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
  8. bitnami/kafka:latest

进入kafka

  1. docker exec -it kafka /bin/sh

3 创建topic

-- 创建topic

  1. ./kafka-topics.sh --bootstrap-server 10.249.53.1:9092 --create --replication-factor 1 --partitions 1 --topic kfk

查看topic
-- 分区topic

  1. ./kafka-topics.sh --list --bootstrap-server 10.249.53.1:9092

4 创建生产者

-- 生产者

  1. ./kafka-console-producer.sh --broker-list 10.249.53.1:9092 --topic kfk

5 创建消费者

-- 消费者

  1. ./kafka-console-consumer.sh --bootstrap-server 10.249.53.1:9092 --topic kfk --from-beginning

三、kafka的java api

1 producer

  1. public class ProducerTest {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. Properties props = new Properties();
  4. //指定Kafka集群的IP地址和端口号
  5. props.put("bootstrap.servers",
  6. "10.249.53.1:9092");
  7. //指定等待所有副本节点的应答
  8. props.put("acks","all");
  9. //指定消息发送最大尝试次数
  10. props.put("retries",1);
  11. //指定一批消息处理大小
  12. props.put("batch.size",16384);
  13. //指定请求延时
  14. props.put("linger.ms",1);
  15. //指定缓存区内存大小
  16. props.put("buffer.memory",33554432);
  17. //设置key序列化
  18. props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
  19. //设置value序列化
  20. props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
  21. //生产数据
  22. KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);
  23. for (int i =0; i < 50; i++){
  24. producer.send(new ProducerRecord<String, String>
  25. ("kfk",Integer.toString(i),"hello world-" + i)).get();
  26. try {
  27. Thread.sleep(500);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. producer.close();
  33. }
  34. }

2 消费者

  1. public class ConsumerTest {
  2. public static void main(String[] args) {
  3. Properties props = new Properties();
  4. props.put("bootstrap.servers", "10.249.53.1:9092");
  5. props.put("group.id", "kfk1");
  6. props.put("enable.auto.commit", "true");
  7. props.put("auto.commit.interval.ms", "1000");
  8. props.put("auto.offset.reset","earliest");
  9. props.put("key.deserializer",
  10. "org.apache.kafka.common.serialization.StringDeserializer");
  11. props.put("value.deserializer",
  12. "org.apache.kafka.common.serialization.StringDeserializer");
  13. KafkaConsumer<String, String> consumer = new
  14. KafkaConsumer<String, String>(props);
  15. // 订阅topic
  16. consumer.subscribe(Arrays.asList("kfk"));
  17. // 消费数据
  18. while (true) {
  19. ConsumerRecords<String, String> records =
  20. consumer.poll(100);
  21. for (ConsumerRecord<String, String> record : records)
  22. System.out.printf("offset = %d, key = %s, value= %s%n", record.offset(), record.key(), record.value());
  23. }
  24. }
  25. }

发表评论

表情:
评论列表 (有 0 条评论,244人围观)

还没有评论,来说两句吧...

相关阅读