Kafka安全认证配置
一. 背景
Kafka 提供了多种安全认证机制,主要分为 SSL 和 SASL 两大类。其中 SASL/PLAIN 是基于账号密码的认证方式,使用较为广泛。
⚠️ 注意:Kafka 增加认证前务必提前配置好
/etc/hosts
的地址解析,否则很容易因认证导致连接超时。
二. 环境
- 操作系统:Linux
- Kafka 版本:
kafka_2.12-3.5.1
- Zookeeper 版本:
zookeeper-3.7.2
三. 认证步骤
3.1 Zookeeper 配置认证和启动
⚠️ Zookeeper 可不配置认证。
1. 修改 zoo.cfg
配置文件,添加以下内容:
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
2. 新建认证配置文件 /conf/zk_server_jaas.conf
:
username
和password
是 Zookeeper 集群内部认证信息。user_kafka="kafkapasswd"
定义 Kafka 用户供客户端使用,需与 Kafka 配置文件一致。- ⚠️ 结尾的分号 不可遗漏!
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="cluster"
password="clusterpasswd"
user_kafka="kafkapasswd";
};
3. 拷贝 Kafka 的认证相关 Jar 包到 Zookeeper 的 lib
目录
从 Kafka 安装目录的 libs/
拷贝以下文件至 Zookeeper 的 lib/
:
kafka-clients-3.5.1.jar
lz4-java-1.8.0.jar
slf4j-api-1.7.36.jar
slf4j-reload4j-1.7.36.jar
snappy-java-1.1.10.1.jar
4. 修改 bin/zkEnv.sh
,添加:
SERVER_JVMFLAGS="-Djava.security.auth.login.config=/data/apache-zookeeper-3.7.2-bin/conf/zk_server_jaas.conf"
5. 启动 Zookeeper:
bin/zkServer.sh start
3.2 Kafka 配置和启动
1. 创建 config/kafka_server_jaas.conf
:
- 第一行为 Kafka Broker 间认证账号。
user_kafka
提供给客户端使用(如 producer/consumer)。- ⚠️ Kafka 与 Zookeeper 认证用户需一致。
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="cluster"
password="clusterpasswd"
user_kafka="kafkapasswd";
};
2. 创建 config/kafka_client_jaas.conf
:
- 用于 Kafka Producer、Consumer 客户端连接使用。
- 各脚本需要通过 JVM 参数加载该文件。
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafkapasswd";
};
3. 创建通用认证配置文件 config/sasl_client.conf
:
- 通用于脚本如
kafka-topics.sh
、kafka-consumer-groups.sh
等。- 使用时通过
--command-config
参数引入。
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafkapasswd";
若使用 SCRAM 验证:
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafkapasswd";
4. 修改 Kafka 配置文件 config/server.properties
:
# 修改监听方式
listeners=SASL_PLAINTEXT://172.21.126.40:9092
# 添加以下配置
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
super.users=User:kafka
# 若需要 ACL 控制(根据版本选择)
# authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer # Kafka 2.x
# authorizer.class.name=kafka.security.authorizer.AclAuthorizer # Kafka 3.x(可能启动失败)
# zookeeper.set.acl=true # 若 Zookeeper 未开启认证,请注释掉该项
5. 修改 Kafka 启动脚本 bin/kafka-server-start.sh
,加入以下环境变量:
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/data/kafka_2.12-3.5.1/config/kafka_server_jaas.conf"
6. 启动 Kafka 服务端:
bin/kafka-server-start.sh -daemon config/server.properties
四. 验证
使用已配置的 sasl_client.conf
验证功能是否正常:
查看 Topic 列表
bin/kafka-topics.sh --list --bootstrap-server 172.21.126.40:9092 --command-config config/sasl_client.conf
创建 Topic
bin/kafka-topics.sh --bootstrap-server 172.21.126.40:9092 --create --topic test --command-config config/sasl_client.conf
删除 Topic
bin/kafka-topics.sh --bootstrap-server 172.21.126.40:9092 --topic test --delete --command-config config/sasl_client.conf
查看消费组列表
bin/kafka-consumer-groups.sh --list --bootstrap-server 172.21.126.40:9092 --command-config config/sasl_client.conf