kerberos认证Flink的kafka connector和kafka client配置

清疚 2024-03-23 17:59 203阅读 0赞

在这里插入图片描述

1. kafka配置文件

kafka jaas必须配置,如果缺少,则报一下错误。

  1. Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set

对于Flink只能通过配置java.security.auth.login.config的方式。

jaas配置

1.1 方式一:

System.setProperty配置系统变量:

  1. System.setProperty("java.security.auth.login.config", "D:\\configs\\kafka_client_jaas_keytab.conf");

kafka_client_jaas_keytab.conf文件内容如下:

  1. KafkaClient {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. useKeyTab = true
  4. useTicketCache=false
  5. storeKey = true
  6. keyTab="D://configs//xxx.keytab"
  7. principal="xxx@XXXXXX.COM"
  8. serviceName="kafka";
  9. };

1.2 方法二:在IDEA中添加jvm参数:

  1. -Djava.security.auth.login.config=D:\\configs\\kafka_client_jaas_keytab.conf

在这里插入图片描述

注意:将参数添加至kafka 的properties中是错误的。如下:

  1. Properties properties = new Properties();
  2. properties.setProperty("java.security.auth.login.config", "D:\\configs\\kafka_client_jaas_keytab.conf");
  3. FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(topic, simpleStringSchema, properties);
2.1 Idea中配置jvm环境变量

idea配置

  1. -Dsecurity.kerberos.krb5-conf.path=D:\configs\krb5.conf -Dsecurity.kerberos.login.keytab=D:\configs\xxx.keytab -Dsecurity.kerberos.login.principal=xxx@XXXXXX.COM
2.2 传递stream env

直接传递参数给flink StreamExecutionEnvironment

  1. Properties flinkProps = new Properties();
  2. flinkProps.setProperty("security.kerberos.krb5-conf.path", "D:\\configs\\krb5.conf");
  3. flinkProps.setProperty("security.kerberos.login.keytab", "D:\\configs\\xxx.keytab");
  4. flinkProps.setProperty("security.kerberos.login.principal", "xxx@XXXXXX.COM");
  5. flinkProps.setProperty("security.kerberos.login.contexts", "Client,KafkaClient");
  6. flinkProps.setProperty("state.backend", "hashmap");
  7. // Configuration flinkConfig = ConfigUtils.getFlinkConfig();
  8. Configuration flinkConfig = new Configuration();
  9. flinkConfig.addAllToProperties(flinkProps);
  10. StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig);

3. 查看是否连接成功

kafka连接成功可以看到如下日志内容:

  1. 09:38:26.473 [Sink: Unnamed (6/8)#0] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in.
  2. ... ...
  3. 09:38:27.534 [kafka-producer-network-thread | producer-3] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-3] Cluster ID: vj0AfElIS12S0Cp0WDBU7Q
  4. ... ...
  5. 09:38:27.618 [kafka-kerberos-refresh-thread-xxx@XXXXXX.COM] WARN org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=xxx@XXXXXX.COM]: TGT renewal thread has been interrupted and will exit.

4. 配置成cache是不行的。

注意:设置成如下cache格式的,是不行的。
虽然flink已经设置了kerberos的principal和keytab 。

  1. System.setProperty("java.security.auth.login.config", "D:\\configs\\kafka_client_jaas_cache.conf");

kafka_client_jaas_cache.conf文件内容:

  1. KafkaClient {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. useTicketCache=true
  4. renewTicket=true
  5. serviceName="kafka";
  6. };

会报如下错误:

  1. Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user

附代码:

  1. @Test
  2. public void testWrite() throws Exception {
  3. // jvm配置:-Dsecurity.kerberos.krb5-conf.path=D:\configs\krb5.conf -Dsecurity.kerberos.login.keytab=D:\configs\xxx.keytab -Dsecurity.kerberos.login.principal=xxx@XXXXXX.COM
  4. // System.setProperty("java.security.auth.login.config", "D:\\configs\\kafka_client_jaas_keytab.conf");
  5. Properties flinkProps = new Properties();
  6. flinkProps.setProperty("security.kerberos.krb5-conf.path", "D:\\configs\\krb5.conf");
  7. flinkProps.setProperty("security.kerberos.login.keytab", "D:\\configs\\xxx.keytab");
  8. flinkProps.setProperty("security.kerberos.login.principal", "xxx@XXXXXX.COM");
  9. flinkProps.setProperty("security.kerberos.login.contexts", "Client,KafkaClient");
  10. flinkProps.setProperty("state.backend", "hashmap");
  11. // Configuration flinkConfig = ConfigUtils.getFlinkConfig();
  12. Configuration flinkConfig = new Configuration();
  13. flinkConfig.addAllToProperties(flinkProps);
  14. StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig);
  15. Properties properties = new Properties();
  16. properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "xxx06.xxx.com:6667,xxx07.xxx.com:6667,xxx08.xxx.com:6667");
  17. properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
  18. properties.setProperty(SaslConfigs.SASL_MECHANISM, "GSSAPI");
  19. properties.setProperty(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka");
  20. // flink-connector-kafka api中错误配置jaas的方法:properties.setProperty(SaslConfigs.SASL_JAAS_CONFIG,String.format(JAAS_CONFIG_KEYTAB_TEMPLATE, "D:\\configs\\xxx.keytab", "xxx@XXXXXX.COM"));
  21. String topic = "flinkcdc";
  22. SimpleStringSchema simpleStringSchema = new SimpleStringSchema();
  23. FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(topic, simpleStringSchema, properties);
  24. senv.fromElements("hello world", "coming again").addSink(producer);
  25. senv.execute("test");
  26. }

二、kafka-client方式

1. kafka 的jaas配置

配置 java的 java.security.auth.login.config 或者 kafka 的sasl.jaas.config 都是可以的。
但注意jaas配置优先级
sasl.jaas.config > java.security.auth.login.config
所以如果配置了 sasl.jaas.config, 就会导致 java.security.auth.login.config 失效

上代码:
首先需要注意sasl.jaas.config 中的路径分隔符不能是 \\ 必须是 /

  1. 错误的:
  2. D:\\configs\\kafka_client_jaas_keytab.conf
  3. 正确的:
  4. D:/configs/kafka_client_jaas_keytab.conf
  5. private static final String JAAS_CONFIG_KEYTAB_TEMPLATE =
  6. "com.sun.security.auth.module.Krb5LoginModule required\n" +
  7. "debug=true\n" +
  8. "doNotPrompt=true\n" +
  9. "storeKey=true\n" +
  10. "useKeyTab=true\n" +
  11. "keyTab=\"%s\"\n" +
  12. "principal=\"%s\";";
  13. @Test
  14. public void testKafkaWrite() {
  15. Properties properties = new Properties();
  16. properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "xxx06.xxx.com:6667,xxx07.xxx.com:6667,xxx08.xxx.com:6667");
  17. properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
  18. properties.setProperty(SaslConfigs.SASL_MECHANISM, "GSSAPI");
  19. properties.setProperty(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka");
  20. properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  21. properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  22. // 以下二者选其中之一就可以了。
  23. System.setProperty("java.security.auth.login.config", "D:\\configs\\kafka_client_jaas_keytab.conf");
  24. // properties.setProperty(SaslConfigs.SASL_JAAS_CONFIG, String.format(JAAS_CONFIG_KEYTAB_TEMPLATE, "D:/configs/xxx.keytab", "xxx@XXXXXX.COM"));
  25. try {
  26. KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
  27. ProducerRecord<String, String> record1 = new ProducerRecord<>("flinkcdc", "hello kafka");
  28. ProducerRecord<String, String> record2 = new ProducerRecord<>("flinkcdc", "coming soon");
  29. Future<RecordMetadata> f1 = producer.send(record1);
  30. Future<RecordMetadata> f2 = producer.send(record2);
  31. producer.flush();
  32. List<Future<RecordMetadata>> fs = new ArrayList<>();
  33. fs.add(f1);
  34. fs.add(f2);
  35. for (Future<RecordMetadata> future : fs) {
  36. RecordMetadata metadata = future.get();
  37. System.out.println(metadata.toString());
  38. }
  39. } catch (Exception e) {
  40. throw new RuntimeException(e);
  41. }
  42. }

kafka_client_jaas_keytab.conf 文件内容和flink-conector-kakfka的一样。

三、kafka console 启动命令

console producer启动命令:

  1. bin/kafka-console-producer.sh --bootstrap-server xxx06.xxx.com:6667,xxx07.xxx.com:6667,xxx08.xxx.com:6667 --topic flinkcdc --producer-property security.protocol=SASL_PLAINTEXT

console consumer启动命令:

  1. bin/kafka-console-consumer.sh --bootstrap-server xxx06.xxx.com:6667,xxx07.xxx.com:6667,xxx08.xxx.com:6667 --topic flinkcdc --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --group tester

发表评论

表情:
评论列表 (有 0 条评论,203人围观)

还没有评论,来说两句吧...

相关阅读