Kafka 配置 SASL/PLAIN 认证 下载安装zk与kafka 略过
安装目录为/data/kafka
安装目录为/data/zookeeper
配置 Kafka 1 vim config/server.properties
更改kafka配置文件,增加以下配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 delete.topic.enable=true listeners=SASL_PLAINTEXT://192.168.11.14:9092 advertised.listeners=SASL_PLAINTEXT://192.168.11.14:9092 # 启用ACL authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer # 设置admin为超级用户 super.users=User:admin # 启用SCRAM机制,采用SCRAM-SHA-512/PLAIN算法 sasl.enabled.mechanisms=PLAIN # 为broker间通讯开启SCRAM机制,采用SCRAM-SHA-512/PLAIN算法 sasl.mechanism.inter.broker.protocol=PLAIN # broker间通讯使用PLAINTEXT,本例中不演示SSL配置 security.inter.broker.protocol=SASL_PLAINTEXT zookeeper.set.acl=true # 禁止自动创建topic auto.create.topics.enable=true allow.everyone.if.no.acl.found=false # 参数 allow.everyone.if.no.acl.found # 设置为 true,ACL 机制改为黑名单机制,只有黑名单中的用户无法访问 # 设置为 false,ACL 机制改为白名单机制,只有白名单中的用户可以访问,默认值为 false
首先创建配置文件 kafka/config/kafka_server_jaas.conf
1 2 cd kafka_2.12-2.8.0/configvim kafka_server_jaas.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="adminpass" user_admin="adminpass" user_producer="producer" user_consumer="consumer"; }; KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="adminpass" user_admin="adminpass" user_producer="producer" user_consumer="consumer"; }; Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="adminpass" user_admin="adminpass"; }; Server { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="adminpass" user_admin="adminpass"; };
user_用户名
配置的是 client 连接 broker 时用的用户、密码。据我尝试,必须定义一个 username 用户对应的 user_ 字段,否则连不上。就像上面,有个 username=”testuser” ,所以必须再定义一次 user_testuser 且密码保持一致。
KafkaServer 和 KafkaClient 是 Kafka Brokers 之间,以及 Kafka 客户端通讯的配置,Server 和 Client 是 Kafka 与 Zookeeper 之间的配置。注意这里和 Zookeeper 配置的 jaas 有点不一样,Kafka 这里是 org.apache.kafka.common.security.plain.PlainLoginModule ,Zookeeper 是 org.apache.zookeeper.server.auth.DigestLoginModule 。
然后编辑启动脚本 bin/kafka-server-start.sh
,在后最添加一行 export 语句
1 2 3 export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/config/kafka_server_jaas.conf" exec $base_dir /kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@ "
最后再重新启动 kafka 就可以了。我们在 jaas 文件中配置了两用户,admin ,consumer和producer,其中我们设置了 admin 为超级用户,consumer和producer分别为生产者和消费者 。
配置 Zookeeper 由于 Kafka 的 metadata 数据是保存在 zookeeper 中的,所以需要设置 zookeeper 支持 SASL 验证,然后配置权限,禁止未登录用户随便删除 Topic 等。
编辑配置文件 zookeeper/conf/zoo.cfg
1 2 3 authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000
其中 authProvider.1 这里的 .1 是 server_id ,多个 zookeeper 节点每个都要配置,例如你有 3 台 zk,那么需要再加上 authProvider.2 和 authProvider.3 等。
添加 jaas 配置文件 zookeeper/conf/zookeeper_jaas.conf
1 2 3 4 5 6 7 8 Client { org.apache.zookeeper.server.auth.DigestLoginModule required user_admin="adminpass"; }; Server { org.apache.zookeeper.server.auth.DigestLoginModule required user_admin="adminpass"; };
修改 zookeeper/bin/zkEnv.sh ,添加一行
1 export SERVER_JVMFLAGS="-Djava.security.auth.login.config=${ZOOBINDIR} /../conf/zookeeper_jaas.conf $SERVER_JVMFLAGS "
这里因为我是用的单独的 Zookeeper 程序包,如果你用的是 Kafka 自带的 Zookeeper,那 SERVER_JVMFLAGS 要改成 KAFKA_OPTS 。滚动重启 Zookeeper 。
配置 zkCli.sh 使用上鉴权。创建配置文件 zookeeper/conf/adminclient_jaas.conf
1 2 3 4 5 Client { org.apache.zookeeper.server.auth.DigestLoginModule required username="admin" password="adminpass" ; };
然后修改 zookeeper/bin/zkCli.sh ,在 java 后面添加,
1 2 3 "$JAVA " "-Djava.security.auth.login.config=/data/zookeeper/conf/adminclient_jaas.conf" "-Dzookeeper.log.dir=${ZOO_LOG_DIR} " "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP} " \ -cp "$CLASSPATH " $CLIENT_JVMFLAGS $JVMFLAGS \ org.apache.zookeeper.ZooKeeperMain "$@ "
配置 Kafka ACL 权限 先修改 Kafka 里的 kafka/bin/zookeeper-security-migration.sh 文件,添加
1 export KAFKA_OPTS="-Djava.security.auth.login.config=/data/release/dp_kafka/config/kafka_server_jaas.conf"
然后运行命令 ,参考这里的文档
1 kafka/bin/zookeeper-security-migration.sh --zookeeper.connect 127.0.0.1:2181 --zookeeper.acl secure
这样 Zookeeper 里面的 metadata 都加上权限了。可以上 Zookeeper 验证下
1 2 3 4 5 6 7 zookeeper/bin/zkCli.sh # 登录进 shell 后,执行 getAcl 指令 getAcl /brokers/topics 'sasl,'admin : cdrwa 'world,'anyone : r
为了用上 Kafka 自带的 shell 工具,我们要配置 jaas 认证,新建一个 kafka/config/adminclient-configs.conf
1 2 3 4 sasl.jaas.config =org.apache.kafka.common.security.plain.PlainLoginModule \ required username ="admin" password="adminpass" security.protocol =SASL_PLAINTEXTsasl.mechanism =PLAIN
使用
1 kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --command-config kafka/config/adminclient-configs.conf --list
一些 ACL 命令使用,参考文档
1 2 3 4 5 6 7 8 # 列出 temp 这个 topic 的 ACL 列表 kafka/bin/kafka-acls.sh --bootstrap-server 127.0.0.1:9092 --command-config kafka/config/adminclient-configs.conf --list --topic "temp" # 对 temp 这个 topic 对用户 test 授权 kafka/bin/kafka-acls.sh --bootstrap-server 127.0.0.1:9092 --command-config kafka/config/adminclient-configs.conf --add --allow-principal User:test --operation Read --operation Write --operation AlterConfigs --operation Describe --operation DescribeConfigs --operation Alter --topic "temp" # 去掉 delete 权限 kafka/bin/kafka-acls.sh --bootstrap-server 127.0.0.1:9092 --command-config kafka/config/adminclient-configs.conf --remove --allow-principal User:test --operation Delete --topic "temp"
配置 Consumer 和 Producer 如果 Kafka 配置了认证,再用脚本消费数据就会报错
1 2 3 4 5 6 7 kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning WARN [Consumer clientId=consumer-console-consumer-93884-1, groupId=console-consumer-93884] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) # 以下是 Kafka Server 端报错日志 INFO [SocketServer listenerType=ZK_BROKER, nodeId=0] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
修改 config/consumer.properties
,添加以下配置
1 2 3 4 5 security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="admin" \ password="adminpass";
修改 config/producer.properties
,添加以下配置
1 2 3 4 5 security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="admin" \ password="adminpass" ;
最后再尝试运行上面的命令。
1 2 3 4 5 6 7 8 kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --consumer.config config/consumer.properties --topic test --from-beginning kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test \ --producer.config=config/producer.properties kafka/bin/kafka-topics.sh --list --bootstrap-server=localhost:9092 \ --command-config config/consumer.properties
已经加上鉴权的 zookeeper 去掉鉴权 如果对已经加上鉴权的 zookeeper、kafka 集群,现在不想要鉴权了,要回退回去,操作如下。
首先运行命令,把 kafka 在 zookeeper 里面的 metadata 数据去掉认证:
1 kafka/bin/zookeeper-security-migration.sh --zookeeper.connect 127.0.0.1:2181 --zookeeper.acl unsecure
去掉后,可以上 zookeeper 使用 getAcl 命令确认。
然后,修改 zookeeper 文件 zookeeper/bin/zkEnv.sh ,去掉之前加的
滚动重启 zookeeper。
再来修改 kafka 配置,修改 kafka/config/kafka_server_jaas.conf ,删除连接 zookeeper 的配置 Client 和 Server 。再修改 kafka/config/server.properties ,删除配置项 zookeeper.set.acl=true 。
再滚动重启 kafka 。
完
参考 [给 Kafka 配置 SASL/PLAIN 认证 )