Kafka安全 之 Authentication using SASL/PLAIN
Table of Contents
配置 kafka broker
配置 kafka 客户端
在生产中使用SASL/PLAIN
完整样例
SASL/PLAIN 是一种简单的用户名/密码身份验证机制,通常与 TLS 一起用于加密以实现安全身份验证。Kafka支持SASL/PLAIN的默认实现
用户名被用作配置acl等的认证主体。
配置 kafka broker
将一个适当修改过的 JAAS 文件添加到每个 Kafka 代理的配置目录中,让我们将它命名为 kafka_server_jaas.conf。
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
};
这个配置定义了两个用户(admin和alice)。代理使用KafkaServer部分中的属性用户名和密码来启动到其他代理的连接。在本例中,admin是代理间通信的用户。user_userName属性集定义连接到代理的所有用户的密码,代理将验证所有客户端连接,包括使用这些属性的其他代理的连接。
将JAAS配置文件位置作为JVM参数传递给每个Kafka代理:
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
在服务器中配置SASL端口和SASL机制。这里描述的属性。例如:
listeners=SASL_SSL://host.name:port
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
配置 kafka 客户端
在客户端配置SASL认证:
为生产者中的每个客户端配置JAAS配置属性。属性或consumer.properties。登录模块描述了像生产者和消费者这样的客户机如何连接到Kafka代理。下面是一个简单机制的客户端配置示例:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="alice" \
password="alice-secret";
客户端使用用户名和密码选项来为客户端连接配置用户。在本例中,客户端作为用户alice连接到代理。JVM中的不同客户机可以通过在sasl.jaas.config中指定不同的用户名和密码来连接为不同的用户。
客户端的JAAS配置也可以指定为类似于这里描述的代理的JVM参数。客户端使用名为KafkaClient的登录部分。此选项仅允许一个用户连接来自JVM的所有客户机连接。
在 producer.properties or consumer.properties 中配置一下属性:
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
在生产中使用SASL/PLAIN
- SASL/PLAIN 应仅与 SSL 作为传输层一起使用,以确保在未加密的情况下不会在网络上传输清晰的密码。
- Kafka 中 SASL/PLAIN 的默认实现指定 JAAS 配置文件中的用户名和密码,如下所示。为了避免将密码存储在磁盘上,可以实现
javax.security.auth.spi.LoginModule
提供来自外部源的用户名和密码。登录模块实现应该提供用户名作为公共凭证,密码作为主题的私有凭证。可以使用默认实现的 org.apache.kafka.common.security.plain.PlainLoginModule 作为示例。 在生产系统中,外部身份验证服务器可以实现密码验证。通过添加您自己的 javax.security.sasl.SaslServer 实现,Kafka代理可以与这些服务器集成。可以使用 org.apache.kafka.common.security.plain 包中包含的默认实现作为开始的示例。
- 必须在JVM中安装并注册新的提供程序。提供程序可以通过将提供程序类添加到普通的类路径中来安装,也可以绑定为jar文件并添加到JAVA_HOME/lib/ext中。
提供程序可以通过将提供程序添加到安全属性文件 JAVA_HOME/lib/security/java.security 来静态注册。
security.provider.n=providerClassName。其中providerClassName是新提供者的完全限定名,n是优先级顺序,数字越低表示优先级越高。
或者,您可以通过调用安全性在运行时动态注册提供者。在客户端应用程序的开头或在登录模块的静态初始化器中添加addProvider。例如:
Security.addProvider(new PlainSaslServerProvider());
完整样例
初始化系统环境变量
private def initSystem: Properties = {
val sysProp = CommonPro.loadProperties
if (!StringUtils.isBlank(sysProp.getProperty("java.security.auth.login.config"))) {
System.setProperty("java.security.auth.login.config", sysProp.getProperty("java.security.auth.login.config"))
}
if (!StringUtils.isBlank(sysProp.getProperty("java.security.krb5.conf"))) {
System.setProperty("java.security.krb5.conf", sysProp.getProperty("java.security.krb5.conf"))
}
if (!StringUtils.isBlank(sysProp.getProperty("sun.security.krb5.debug"))) {
System.setProperty("sun.security.krb5.debug", sysProp.getProperty("sun.security.krb5.debug"))
}
if (!StringUtils.isBlank(sysProp.getProperty("javax.security.auth.useSubjectCredsOnly"))) {
System.setProperty("javax.security.auth.useSubjectCredsOnly", sysProp.getProperty("javax.security.auth.useSubjectCredsOnly"))
}
sysProp
}
初始化 kafka consumer source
private def initConsumerSource(sysProp: Properties): FlinkKafkaConsumer010[ObjectNode] = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", sysProp.getProperty("bootstrap.servers"))
properties.setProperty("group.id", sysProp.getProperty("group.id"))
properties.setProperty("enable.auto.commit", "false")
properties.setProperty("session.timeout.ms", "30000")
//每次poll最多获取10条数据
properties.setProperty("max.poll.records", "10")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
if (!StringUtils.isBlank(sysProp.getProperty("sasl.mechanism"))) {
properties.setProperty("sasl.mechanism", sysProp.getProperty("sasl.mechanism"))
}
if (!StringUtils.isBlank(sysProp.getProperty("security.protocol"))) {
properties.setProperty("security.protocol", sysProp.getProperty("security.protocol"))
}
if (!StringUtils.isBlank(sysProp.getProperty("sasl.jaas.config"))) {
properties.setProperty("sasl.jaas.config", sysProp.getProperty("sasl.jaas.config"))
}
if (!StringUtils.isBlank(sysProp.getProperty("sasl.kerberos.service.name"))) {
properties.setProperty("sasl.kerberos.service.name", sysProp.getProperty("sasl.kerberos.service.name"))
}
val myConsumer = new FlinkKafkaConsumer010("log_dls", new JSONKeyValueDeserializationSchema(true), properties)
val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
specificStartOffsets.put(new KafkaTopicPartition("log_dls", 0), 29L)
myConsumer.setStartFromSpecificOffsets(specificStartOffsets)
// myConsumer.setStartFromEarliest()
myConsumer
}
配置文件
zookeeper.connect=xxx
bootstrap.servers=xxx
group.id=op_log_consumer
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
#sasl.kerberos.service.name=xxx
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx" password="xxx";
java.security.auth.login.config=/web/jaas.conf
#java.security.krb5.conf=/kerberos/krb5.conf
#sun.security.krb5.debug=false
#javax.security.auth.useSubjectCredsOnly=false
default.hdfs.path=/user/hive/warehouse/api_log/
原文地址:https://kafka.apache.org/0110/documentation.html#security_sasl_plain_brokerconfig
还没有评论,来说两句吧...