Kafka安全 之 Authentication using SASL/PLAIN

迷南。 2023-06-26 08:26 68阅读 0赞

Table of Contents

配置 kafka broker

配置 kafka 客户端

在生产中使用SASL/PLAIN

完整样例


SASL/PLAIN 是一种简单的用户名/密码身份验证机制,通常与 TLS 一起用于加密以实现安全身份验证。Kafka支持SASL/PLAIN的默认实现

用户名被用作配置acl等的认证主体。

配置 kafka broker

  • 将一个适当修改过的 JAAS 文件添加到每个 Kafka 代理的配置目录中,让我们将它命名为 kafka_server_jaas.conf。

    KafkaServer {

    1. org.apache.kafka.common.security.plain.PlainLoginModule required
    2. username="admin"
    3. password="admin-secret"
    4. user_admin="admin-secret"
    5. user_alice="alice-secret";

    };

这个配置定义了两个用户(admin和alice)。代理使用KafkaServer部分中的属性用户名和密码来启动到其他代理的连接。在本例中,admin是代理间通信的用户。user_userName属性集定义连接到代理的所有用户的密码,代理将验证所有客户端连接,包括使用这些属性的其他代理的连接。

  • 将JAAS配置文件位置作为JVM参数传递给每个Kafka代理:

    -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf

  • 在服务器中配置SASL端口和SASL机制。这里描述的属性。例如:

    listeners=SASL_SSL://host.name:port

    1. security.inter.broker.protocol=SASL_SSL
    2. sasl.mechanism.inter.broker.protocol=PLAIN
    3. sasl.enabled.mechanisms=PLAIN

配置 kafka 客户端

在客户端配置SASL认证:

  • 为生产者中的每个客户端配置JAAS配置属性。属性或consumer.properties。登录模块描述了像生产者和消费者这样的客户机如何连接到Kafka代理。下面是一个简单机制的客户端配置示例:

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \

    1. username="alice" \
    2. password="alice-secret";

客户端使用用户名和密码选项来为客户端连接配置用户。在本例中,客户端作为用户alice连接到代理。JVM中的不同客户机可以通过在sasl.jaas.config中指定不同的用户名和密码来连接为不同的用户。

客户端的JAAS配置也可以指定为类似于这里描述的代理的JVM参数。客户端使用名为KafkaClient的登录部分。此选项仅允许一个用户连接来自JVM的所有客户机连接。

  • 在 producer.properties or consumer.properties 中配置一下属性:

    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN

在生产中使用SASL/PLAIN

  • SASL/PLAIN 应仅与 SSL 作为传输层一起使用,以确保在未加密的情况下不会在网络上传输清晰的密码。
  • Kafka 中 SASL/PLAIN 的默认实现指定 JAAS 配置文件中的用户名和密码,如下所示。为了避免将密码存储在磁盘上,可以实现 javax.security.auth.spi.LoginModule 提供来自外部源的用户名和密码。登录模块实现应该提供用户名作为公共凭证,密码作为主题的私有凭证。可以使用默认实现的 org.apache.kafka.common.security.plain.PlainLoginModule 作为示例。
  • 在生产系统中,外部身份验证服务器可以实现密码验证。通过添加您自己的 javax.security.sasl.SaslServer 实现,Kafka代理可以与这些服务器集成。可以使用 org.apache.kafka.common.security.plain 包中包含的默认实现作为开始的示例。

    • 必须在JVM中安装并注册新的提供程序。提供程序可以通过将提供程序类添加到普通的类路径中来安装,也可以绑定为jar文件并添加到JAVA_HOME/lib/ext中。
    • 提供程序可以通过将提供程序添加到安全属性文件 JAVA_HOME/lib/security/java.security 来静态注册。

      1. security.provider.n=providerClassName。其中providerClassName是新提供者的完全限定名,n是优先级顺序,数字越低表示优先级越高。
    • 或者,您可以通过调用安全性在运行时动态注册提供者。在客户端应用程序的开头或在登录模块的静态初始化器中添加addProvider。例如:

      1. Security.addProvider(new PlainSaslServerProvider());

完整样例

初始化系统环境变量

  1. private def initSystem: Properties = {
  2. val sysProp = CommonPro.loadProperties
  3. if (!StringUtils.isBlank(sysProp.getProperty("java.security.auth.login.config"))) {
  4. System.setProperty("java.security.auth.login.config", sysProp.getProperty("java.security.auth.login.config"))
  5. }
  6. if (!StringUtils.isBlank(sysProp.getProperty("java.security.krb5.conf"))) {
  7. System.setProperty("java.security.krb5.conf", sysProp.getProperty("java.security.krb5.conf"))
  8. }
  9. if (!StringUtils.isBlank(sysProp.getProperty("sun.security.krb5.debug"))) {
  10. System.setProperty("sun.security.krb5.debug", sysProp.getProperty("sun.security.krb5.debug"))
  11. }
  12. if (!StringUtils.isBlank(sysProp.getProperty("javax.security.auth.useSubjectCredsOnly"))) {
  13. System.setProperty("javax.security.auth.useSubjectCredsOnly", sysProp.getProperty("javax.security.auth.useSubjectCredsOnly"))
  14. }
  15. sysProp
  16. }

初始化 kafka consumer source

  1. private def initConsumerSource(sysProp: Properties): FlinkKafkaConsumer010[ObjectNode] = {
  2. val properties = new Properties()
  3. properties.setProperty("bootstrap.servers", sysProp.getProperty("bootstrap.servers"))
  4. properties.setProperty("group.id", sysProp.getProperty("group.id"))
  5. properties.setProperty("enable.auto.commit", "false")
  6. properties.setProperty("session.timeout.ms", "30000")
  7. //每次poll最多获取10条数据
  8. properties.setProperty("max.poll.records", "10")
  9. properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  10. properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  11. if (!StringUtils.isBlank(sysProp.getProperty("sasl.mechanism"))) {
  12. properties.setProperty("sasl.mechanism", sysProp.getProperty("sasl.mechanism"))
  13. }
  14. if (!StringUtils.isBlank(sysProp.getProperty("security.protocol"))) {
  15. properties.setProperty("security.protocol", sysProp.getProperty("security.protocol"))
  16. }
  17. if (!StringUtils.isBlank(sysProp.getProperty("sasl.jaas.config"))) {
  18. properties.setProperty("sasl.jaas.config", sysProp.getProperty("sasl.jaas.config"))
  19. }
  20. if (!StringUtils.isBlank(sysProp.getProperty("sasl.kerberos.service.name"))) {
  21. properties.setProperty("sasl.kerberos.service.name", sysProp.getProperty("sasl.kerberos.service.name"))
  22. }
  23. val myConsumer = new FlinkKafkaConsumer010("log_dls", new JSONKeyValueDeserializationSchema(true), properties)
  24. val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
  25. specificStartOffsets.put(new KafkaTopicPartition("log_dls", 0), 29L)
  26. myConsumer.setStartFromSpecificOffsets(specificStartOffsets)
  27. // myConsumer.setStartFromEarliest()
  28. myConsumer
  29. }

配置文件

  1. zookeeper.connect=xxx
  2. bootstrap.servers=xxx
  3. group.id=op_log_consumer
  4. sasl.mechanism=PLAIN
  5. security.protocol=SASL_PLAINTEXT
  6. #sasl.kerberos.service.name=xxx
  7. sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx" password="xxx";
  8. java.security.auth.login.config=/web/jaas.conf
  9. #java.security.krb5.conf=/kerberos/krb5.conf
  10. #sun.security.krb5.debug=false
  11. #javax.security.auth.useSubjectCredsOnly=false
  12. default.hdfs.path=/user/hive/warehouse/api_log/

原文地址:https://kafka.apache.org/0110/documentation.html#security_sasl_plain_brokerconfig

发表评论

表情:
评论列表 (有 0 条评论,68人围观)

还没有评论,来说两句吧...

相关阅读