Kafka安全认证配置

一. 背景

Kafka 提供了多种安全认证机制,主要分为 SSLSASL 两大类。其中 SASL/PLAIN 是基于账号密码的认证方式,使用较为广泛。

⚠️ 注意:Kafka 增加认证前务必提前配置好 /etc/hosts 的地址解析,否则很容易因认证导致连接超时。


二. 环境

  • 操作系统:Linux
  • Kafka 版本:kafka_2.12-3.5.1
  • Zookeeper 版本:zookeeper-3.7.2

三. 认证步骤

3.1 Zookeeper 配置认证和启动

⚠️ Zookeeper 可不配置认证。

1. 修改 zoo.cfg 配置文件,添加以下内容:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

2. 新建认证配置文件 /conf/zk_server_jaas.conf

  • usernamepassword 是 Zookeeper 集群内部认证信息。
  • user_kafka="kafkapasswd" 定义 Kafka 用户供客户端使用,需与 Kafka 配置文件一致。
  • ⚠️ 结尾的分号 不可遗漏!
Server {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="cluster"
    password="clusterpasswd"
    user_kafka="kafkapasswd";
};

3. 拷贝 Kafka 的认证相关 Jar 包到 Zookeeper 的 lib 目录

从 Kafka 安装目录的 libs/ 拷贝以下文件至 Zookeeper 的 lib/

kafka-clients-3.5.1.jar
lz4-java-1.8.0.jar
slf4j-api-1.7.36.jar
slf4j-reload4j-1.7.36.jar
snappy-java-1.1.10.1.jar

4. 修改 bin/zkEnv.sh,添加:

SERVER_JVMFLAGS="-Djava.security.auth.login.config=/data/apache-zookeeper-3.7.2-bin/conf/zk_server_jaas.conf"

5. 启动 Zookeeper:

bin/zkServer.sh start

3.2 Kafka 配置和启动

1. 创建 config/kafka_server_jaas.conf

  • 第一行为 Kafka Broker 间认证账号。
  • user_kafka 提供给客户端使用(如 producer/consumer)。
  • ⚠️ Kafka 与 Zookeeper 认证用户需一致。
Server {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="cluster"
    password="clusterpasswd"
    user_kafka="kafkapasswd";
};

2. 创建 config/kafka_client_jaas.conf

  • 用于 Kafka Producer、Consumer 客户端连接使用。
  • 各脚本需要通过 JVM 参数加载该文件。
KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="kafka"
    password="kafkapasswd";
};

3. 创建通用认证配置文件 config/sasl_client.conf

  • 通用于脚本如 kafka-topics.shkafka-consumer-groups.sh 等。
  • 使用时通过 --command-config 参数引入。
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafkapasswd";

若使用 SCRAM 验证:

sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafkapasswd";

4. 修改 Kafka 配置文件 config/server.properties

# 修改监听方式
listeners=SASL_PLAINTEXT://172.21.126.40:9092

# 添加以下配置
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
super.users=User:kafka

# 若需要 ACL 控制(根据版本选择)
# authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer   # Kafka 2.x
# authorizer.class.name=kafka.security.authorizer.AclAuthorizer   # Kafka 3.x(可能启动失败)
# zookeeper.set.acl=true     # 若 Zookeeper 未开启认证,请注释掉该项

5. 修改 Kafka 启动脚本 bin/kafka-server-start.sh,加入以下环境变量:

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/data/kafka_2.12-3.5.1/config/kafka_server_jaas.conf"

6. 启动 Kafka 服务端:

bin/kafka-server-start.sh -daemon config/server.properties

四. 验证

使用已配置的 sasl_client.conf 验证功能是否正常:

查看 Topic 列表

bin/kafka-topics.sh --list --bootstrap-server 172.21.126.40:9092 --command-config config/sasl_client.conf

创建 Topic

bin/kafka-topics.sh --bootstrap-server 172.21.126.40:9092 --create --topic test --command-config config/sasl_client.conf

删除 Topic

bin/kafka-topics.sh --bootstrap-server 172.21.126.40:9092 --topic test --delete --command-config config/sasl_client.conf

查看消费组列表

bin/kafka-consumer-groups.sh --list --bootstrap-server 172.21.126.40:9092 --command-config config/sasl_client.conf