kafka 0.10.0.0 版本

偏执的太偏执、 2022-09-24 09:24 136阅读 0赞

kafka 0.10.0.0 版本

一、安装kafka

1)下载路径:http://apache.fayea.com/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz

2)解压安装包

  1. tar -xzf kafka_2.11-0.10.0.0.tgz
  2. cd kafka_2.11-0.10.0.0

二、启动zookeeper

运行kafka需要使用Zookeeper,所有你需要先启动一个Zookeeper服务器,如果你没有Zookeeper,你可以使用kafka自带打包和配置好的Zookeeper

  1. bin/zookeeper-server-start.sh config/zookeeper.properties

三、启动kafka

  1. bin/kafka-server-start.sh config/server.properties

四、编写produce生成者

pom.xml

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>0.10.0.0</version>
  5. </dependency>

produce.java

  1. package kafka.produce;
  2. import java.util.Properties;
  3. import org.apache.kafka.clients.producer.KafkaProducer;
  4. import org.apache.kafka.clients.producer.Producer;
  5. import org.apache.kafka.clients.producer.ProducerRecord;
  6. public class Produce {
  7. public static void main(String[] args) {
  8. System.out.println("begin produce");
  9. connectionKafka();
  10. System.out.println("finish produce");
  11. }
  12. public static void connectionKafka() {
  13. Properties props = new Properties();
  14. props.put("bootstrap.servers", "localhost:9092");
  15. props.put("acks", "all");
  16. props.put("retries", 0);
  17. props.put("batch.size", 16384);
  18. props.put("linger.ms", 1);
  19. props.put("buffer.memory", 33554432);
  20. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  21. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  22. Producer<String, String> producer = new KafkaProducer<>(props);
  23. for (int i = 0; i < 10; i++) {
  24. producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
  25. }
  26. producer.close();
  27. }
  28. }

五、编写consumer消费者

pom.xml

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>0.10.0.0</version>
  5. </dependency>

consumer.java

  1. package kafka.consumer;
  2. import java.util.Arrays;
  3. import java.util.Properties;
  4. import org.apache.kafka.clients.consumer.ConsumerRecord;
  5. import org.apache.kafka.clients.consumer.ConsumerRecords;
  6. import org.apache.kafka.clients.consumer.KafkaConsumer;
  7. public class Consumer {
  8. public static void main(String[] args) {
  9. System.out.println("begin consumer");
  10. connectionKafka();
  11. System.out.println("finish consumer");
  12. }
  13. @SuppressWarnings("resource")
  14. public static void connectionKafka() {
  15. Properties props = new Properties();
  16. props.put("bootstrap.servers", "localhost:9092");
  17. props.put("group.id", "testConsumer");
  18. props.put("enable.auto.commit", "true");
  19. props.put("auto.commit.interval.ms", "1000");
  20. props.put("session.timeout.ms", "30000");
  21. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  22. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  23. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  24. consumer.subscribe(Arrays.asList("my-topic","test-topic"));
  25. while (true) {
  26. ConsumerRecords<String, String> records = consumer.poll(100);
  27. try {
  28. Thread.sleep(20000);
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. for (ConsumerRecord<String, String> record : records) {
  33. System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
  34. }
  35. }
  36. }
  37. }

六、注意事项

客户端pom.xml配置的kafka-clients版本一定要与kafka版本(0.10.0.0)一致,否则会报错

1) 错误一:

  1. Error reading field 'topic_metadata': Error reading array of size 1139567, only 45 bytes available

2) 错误二:

  1. Exception in thread "main" org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'brokers': Error reading field 'host': Error reading string of length 12601, only 210 bytes available
  2. at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73)
  3. at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:380)
  4. at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)
  5. at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
  6. at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
  7. at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
  8. at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
  9. at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
  10. at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:183)
  11. at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:973)
  12. at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
  13. at kafka.consumer.Consumer.connectionKafka(Consumer.java:32)
  14. at kafka.consumer.Consumer.main(Consumer.java:14)

七、shell脚本

shell启动kafka生成者客户端

  1. ./kafka-console-producer.sh --broker-list localhost:9092 --topic zkTopic

shell启动kafka消费者客户端

  1. ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic zkTopic --from-beginning --zookeeper localhost:2181

shell查看topic分区信息

  1. ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic zkTopic1

运行结果

  1. Topic:zkTopic1 PartitionCount:4 ReplicationFactor:1 Configs:
  2. Topic: zkTopic1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
  3. Topic: zkTopic1 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
  4. Topic: zkTopic1 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
  5. Topic: zkTopic1 Partition: 3 Leader: 0 Replicas: 0 Isr: 0

八、参考地址

1)官方文档:http://kafka.apache.org/documentation.html#producerconfigs
2)中文地址:http://orchome.com/295

发表评论

表情:
评论列表 (有 0 条评论,136人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Kafka版本演进

    一、Kafka概述Kafka是目前主流的分布式消息引擎及流处理平台,经常用做企业的消息总线、实时数据管道,有的还把它当做存储系统来使用。早期Kafka的定位是一个高吞吐的...

    相关 kafka版本

    今天聊聊kafka版本号的问题,这个问题实在是太重要了,我觉得甚至是日后能否用好kafka的关键。上一节我们介绍了kafka的几种发行版,其实不论是哪种kafka,本质上都内嵌

    相关 查看kafka版本

    很奇怪,kafka并没有什么命令可以查看具体的版本,那么怎么去查看安装的kafka版本呢? 1: 如果安装的是apache官网的kafka,那么进入机器如下对应的目录: 标注

    相关 ORA-01000 error

    ORA-01000是最大开放游标错误,是Oracle数据库开发中极为常见的错误。 在Java的上下文中,当应用程序尝试打开更多ResultSet而不是数据库实例上的已配置游标时