Java代码kafka通过Kerberos认证消费数据

今天药忘吃喽~ 2024-04-28 13:08 122阅读 0赞

Java代码kafka通过Kerberos认证消费数据

kafka消费数据

  1. package com.beijingbank.rcp.hbase;
  2. import java.io.PrintStream;
  3. import java.time.Duration;
  4. import java.util.Collections;
  5. import java.util.Iterator;
  6. import java.util.Properties;
  7. import org.apache.kafka.clients.consumer.ConsumerConfig;
  8. import org.apache.kafka.clients.consumer.ConsumerRecord;
  9. import org.apache.kafka.clients.consumer.ConsumerRecords;
  10. import org.apache.kafka.clients.consumer.KafkaConsumer;
  11. import org.apache.kafka.common.serialization.StringDeserializer;
  12. public class KafkaConsumerExample
  13. {
  14. public static void main(String[] args)
  15. {
  16. System.setProperty("java.security.krb5.conf", "kerberos/krb5.conf");
  17. Properties properties = new Properties();
  18. properties.setProperty("bootstrap.servers", "aaaa:9092,bbbb:9092,ccc:9092");
  19. properties.setProperty("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required \n useKeyTab=true \n storeKey=true \n keyTab=\"kerberos/rcp.keytab\" \n principal=\"hadoop@HADOOP.COM\";");
  20. properties.setProperty("sasl.kerberos.service.name", "kafka");
  21. properties.setProperty("sasl.mechanism", "GSSAPI");
  22. properties.setProperty("security.protocol", "SASL_PLAINTEXT");
  23. properties.setProperty("auto.offset.reset", "earliest");
  24. properties.setProperty("group.id", "test_003");
  25. Properties properties1 = ConsumerConfig.addDeserializerToConfig(properties, new StringDeserializer(), new StringDeserializer());
  26. KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties1);
  27. kafkaConsumer.subscribe(Collections.singleton("rcp_test_002"));
  28. for (;;)
  29. {
  30. ConsumerRecords<String, String> poll = kafkaConsumer.poll(Duration.ofMillis(1000L));
  31. if (!poll.isEmpty())
  32. {
  33. Iterator<ConsumerRecord<String, String>> iterator = poll.iterator();
  34. while (iterator.hasNext())
  35. {
  36. ConsumerRecord<String, String> next = (ConsumerRecord)iterator.next();
  37. System.out.println((String)next.value());
  38. }
  39. }
  40. }
  41. }
  42. }

kafka发送数据

  1. package com.td.hadoop.hbase;
  2. import java.io.PrintStream;
  3. import java.util.Properties;
  4. import java.util.Random;
  5. import java.util.Scanner;
  6. import org.apache.kafka.clients.producer.KafkaProducer;
  7. import org.apache.kafka.clients.producer.ProducerConfig;
  8. import org.apache.kafka.clients.producer.ProducerRecord;
  9. import org.apache.kafka.common.serialization.StringSerializer;
  10. public class KafkaProducerExample
  11. {
  12. public static void main(String[] args)
  13. {
  14. System.setProperty("java.security.krb5.conf", "kerberos/krb5.conf");
  15. Properties properties = new Properties();
  16. properties.setProperty("bootstrap.servers", "aaaa:9092,bbbb:9092,cccc:9092");
  17. properties.setProperty("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required \n useKeyTab=true \n storeKey=true \n keyTab=\"kerberos/rcp.keytab\" \n principal=\"hadoop@HADOOP.COM\";");
  18. properties.setProperty("sasl.kerberos.service.name", "kafka");
  19. properties.setProperty("sasl.mechanism", "GSSAPI");
  20. properties.setProperty("security.protocol", "SASL_PLAINTEXT");
  21. Properties properties1 = ProducerConfig.addSerializerToConfig(properties, new StringSerializer(), new StringSerializer());
  22. KafkaProducer<String, String> stringStringKafkaProducer = new KafkaProducer(properties1);
  23. Random random = new Random();
  24. Scanner scanner = new Scanner(System.in);
  25. while (scanner.hasNext())
  26. {
  27. String next = scanner.next();
  28. ProducerRecord<String, String> rcp_test_001 = new ProducerRecord("rcp_test_002", random.nextInt() + "", next);
  29. stringStringKafkaProducer.send(rcp_test_001);
  30. stringStringKafkaProducer.flush();
  31. }
  32. }
  33. }

发表评论

表情:
评论列表 (有 0 条评论,122人围观)

还没有评论,来说两句吧...

相关阅读

    相关 kerberos认证原理

    前几天在给人解释Windows是如何通过Kerberos进行Authentication的时候,讲了半天也别把那位老兄讲明白,还差点把自己给绕进去。后来想想原因有以下两点:对于

    相关 安全认证--Kerberos

    功介绍: Kerberos这一名词来源于希腊神话“三个头的狗——地狱之门守护者”,后来沿用作为安全认证的概念,使用Kerberos的系统在设计上采用“客户端/服务器”结构