Kafka配置SASL/PLAIN认证
LiuSw Lv6

Kafka 配置 SASL/PLAIN 认证

下载安装zk与kafka

略过

安装目录为/data/kafka

安装目录为/data/zookeeper

配置 Kafka

1
vim config/server.properties

更改kafka配置文件,增加以下配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
delete.topic.enable=true  
listeners=SASL_PLAINTEXT://192.168.11.14:9092
advertised.listeners=SASL_PLAINTEXT://192.168.11.14:9092
# 启用ACL
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 设置admin为超级用户
super.users=User:admin
# 启用SCRAM机制,采用SCRAM-SHA-512/PLAIN算法
sasl.enabled.mechanisms=PLAIN
# 为broker间通讯开启SCRAM机制,采用SCRAM-SHA-512/PLAIN算法
sasl.mechanism.inter.broker.protocol=PLAIN
# broker间通讯使用PLAINTEXT,本例中不演示SSL配置
security.inter.broker.protocol=SASL_PLAINTEXT
zookeeper.set.acl=true
# 禁止自动创建topic
auto.create.topics.enable=true

allow.everyone.if.no.acl.found=false
# 参数 allow.everyone.if.no.acl.found
# 设置为 true,ACL 机制改为黑名单机制,只有黑名单中的用户无法访问
# 设置为 false,ACL 机制改为白名单机制,只有白名单中的用户可以访问,默认值为 false

首先创建配置文件 kafka/config/kafka_server_jaas.conf

1
2
cd kafka_2.12-2.8.0/config
vim kafka_server_jaas.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpass"
user_admin="adminpass"
user_producer="producer"
user_consumer="consumer";
};
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpass"
user_admin="adminpass"
user_producer="producer"
user_consumer="consumer";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpass"
user_admin="adminpass";
};
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpass"
user_admin="adminpass";
};

user_用户名 配置的是 client 连接 broker 时用的用户、密码。据我尝试,必须定义一个 username 用户对应的 user_ 字段,否则连不上。就像上面,有个 username=”testuser” ,所以必须再定义一次 user_testuser 且密码保持一致。

KafkaServer 和 KafkaClient 是 Kafka Brokers 之间,以及 Kafka 客户端通讯的配置,Server 和 Client 是 Kafka 与 Zookeeper 之间的配置。注意这里和 Zookeeper 配置的 jaas 有点不一样,Kafka 这里是 org.apache.kafka.common.security.plain.PlainLoginModule ,Zookeeper 是 org.apache.zookeeper.server.auth.DigestLoginModule 。

然后编辑启动脚本 bin/kafka-server-start.sh ,在后最添加一行 export 语句

1
2
3
export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/config/kafka_server_jaas.conf"

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

最后再重新启动 kafka 就可以了。我们在 jaas 文件中配置了两用户,admin ,consumer和producer,其中我们设置了 admin 为超级用户,consumer和producer分别为生产者和消费者 。

配置 Zookeeper

由于 Kafka 的 metadata 数据是保存在 zookeeper 中的,所以需要设置 zookeeper 支持 SASL 验证,然后配置权限,禁止未登录用户随便删除 Topic 等。

编辑配置文件 zookeeper/conf/zoo.cfg

1
2
3
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

其中 authProvider.1 这里的 .1 是 server_id ,多个 zookeeper 节点每个都要配置,例如你有 3 台 zk,那么需要再加上 authProvider.2 和 authProvider.3 等。

添加 jaas 配置文件 zookeeper/conf/zookeeper_jaas.conf

1
2
3
4
5
6
7
8
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_admin="adminpass";
};
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_admin="adminpass";
};

修改 zookeeper/bin/zkEnv.sh ,添加一行

1
export SERVER_JVMFLAGS="-Djava.security.auth.login.config=${ZOOBINDIR}/../conf/zookeeper_jaas.conf $SERVER_JVMFLAGS"

这里因为我是用的单独的 Zookeeper 程序包,如果你用的是 Kafka 自带的 Zookeeper,那 SERVER_JVMFLAGS 要改成 KAFKA_OPTS 。滚动重启 Zookeeper 。

配置 zkCli.sh 使用上鉴权。创建配置文件 zookeeper/conf/adminclient_jaas.conf

1
2
3
4
5
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="adminpass";
};

然后修改 zookeeper/bin/zkCli.sh ,在 java 后面添加,

1
2
3
"$JAVA" "-Djava.security.auth.login.config=/data/zookeeper/conf/adminclient_jaas.conf" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
-cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS \
org.apache.zookeeper.ZooKeeperMain "$@"

配置 Kafka ACL 权限

先修改 Kafka 里的 kafka/bin/zookeeper-security-migration.sh 文件,添加

1
export KAFKA_OPTS="-Djava.security.auth.login.config=/data/release/dp_kafka/config/kafka_server_jaas.conf"

然后运行命令 ,参考这里的文档

1
kafka/bin/zookeeper-security-migration.sh --zookeeper.connect 127.0.0.1:2181 --zookeeper.acl secure

这样 Zookeeper 里面的 metadata 都加上权限了。可以上 Zookeeper 验证下

1
2
3
4
5
6
7
zookeeper/bin/zkCli.sh
# 登录进 shell 后,执行 getAcl 指令
getAcl /brokers/topics
'sasl,'admin
: cdrwa
'world,'anyone
: r

为了用上 Kafka 自带的 shell 工具,我们要配置 jaas 认证,新建一个 kafka/config/adminclient-configs.conf

1
2
3
4
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="admin" password="adminpass";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

使用

1
kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --command-config kafka/config/adminclient-configs.conf --list

一些 ACL 命令使用,参考文档

1
2
3
4
5
6
7
8
# 列出 temp 这个 topic 的 ACL 列表
kafka/bin/kafka-acls.sh --bootstrap-server 127.0.0.1:9092 --command-config kafka/config/adminclient-configs.conf --list --topic "temp"

# 对 temp 这个 topic 对用户 test 授权
kafka/bin/kafka-acls.sh --bootstrap-server 127.0.0.1:9092 --command-config kafka/config/adminclient-configs.conf --add --allow-principal User:test --operation Read --operation Write --operation AlterConfigs --operation Describe --operation DescribeConfigs --operation Alter --topic "temp"

# 去掉 delete 权限
kafka/bin/kafka-acls.sh --bootstrap-server 127.0.0.1:9092 --command-config kafka/config/adminclient-configs.conf --remove --allow-principal User:test --operation Delete --topic "temp"

配置 Consumer 和 Producer

如果 Kafka 配置了认证,再用脚本消费数据就会报错

1
2
3
4
5
6
7
kafka/bin/kafka-console-consumer.sh  --bootstrap-server localhost:9092 --topic  test --from-beginning 

WARN [Consumer clientId=consumer-console-consumer-93884-1, groupId=console-consumer-93884] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)

# 以下是 Kafka Server 端报错日志

INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)

修改 config/consumer.properties ,添加以下配置

1
2
3
4
5
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="adminpass";

修改 config/producer.properties ,添加以下配置

1
2
3
4
5
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="adminpass";

最后再尝试运行上面的命令。

1
2
3
4
5
6
7
8
kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--consumer.config config/consumer.properties --topic test --from-beginning

kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test \
--producer.config=config/producer.properties

kafka/bin/kafka-topics.sh --list --bootstrap-server=localhost:9092 \
--command-config config/consumer.properties

已经加上鉴权的 zookeeper 去掉鉴权

如果对已经加上鉴权的 zookeeper、kafka 集群,现在不想要鉴权了,要回退回去,操作如下。

首先运行命令,把 kafka 在 zookeeper 里面的 metadata 数据去掉认证:

1
kafka/bin/zookeeper-security-migration.sh --zookeeper.connect 127.0.0.1:2181 --zookeeper.acl unsecure

去掉后,可以上 zookeeper 使用 getAcl 命令确认。

然后,修改 zookeeper 文件 zookeeper/bin/zkEnv.sh ,去掉之前加的

1
2
# 把这一行注释掉
# export SERVER_JVMFLAGS="-Djava.security.auth.login.config=${ZOOBINDIR}/../conf/zookeeper_jaas.conf $SERVER_JVMFLAGS"

滚动重启 zookeeper。

再来修改 kafka 配置,修改 kafka/config/kafka_server_jaas.conf ,删除连接 zookeeper 的配置 Client 和 Server 。再修改 kafka/config/server.properties ,删除配置项 zookeeper.set.acl=true 。

再滚动重启 kafka 。

参考 [给 Kafka 配置 SASL/PLAIN 认证 )

 评论