kafka使用pem密钥和客户端证书的ssl连接

wdebmtf2  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(1172)

我可以使用client.properties中下面的ssl详细信息连接到kafka并从cli(bin/kafka console consumer.sh)读取数据

ssl.keystore.location=/test/keystore.jks
ssl.keystore.password=abcd1234
ssl.key.password=abcd1234
Command: bin/kafka-console-consumer.sh --bootstrap-server 'server details'  --topic topic_name --consumer.config client.properties --group group-id

但我无法使用相同的数据从python或spark连接 consumer = KafkaConsumer(topic,bootstrap_servers=bootstrap_server,security_protocol='SSL',sasl_mechanism='PLAIN',ssl_certfile='certificate.pem',ssl_keyfile='pk.key') 我尝试在上面的代码中更改多个选项,例如添加check\u host\u name等,但没有成功。kafka不属于我们的团队,而是由另一个团队来管理,当我们请求访问时,我们会得到一个私钥和证书以及ca包和arn名称。
从spark(python)开始,我尝试了以下代码

sdf1 = spark.readStream.format("kafka")
       .option("kafka.bootstrap.servers",bootstrap_server)
       .option("subscribe", topic_name)
       .option("startingOffsets", "latest")
       .option("kafka.security.protocol","SSL")
       .option("kafka.ssl.keystore.location",'keystore.jks')
       .option("kafka.ssl.keystore.password", '****')
       .option("kafka.ssl.key.password",'****')
       .load()

我收到了类似“org.apache.kafka.common.errors.groupauthorizationexception:无权访问组:spark-kafka-source-x-xx-xx”的错误
上述错误可能与spark每次访问时都生成唯一组id有关。仅在spark 3.0及更高版本中才允许在sparkDataframe中使用组id。我需要在spark 2.4.4中修复这个选项。
如有任何建议,将不胜感激。

c3frrgcw

c3frrgcw1#

您只需要提供用于验证对主题的访问的主体,而不考虑使用者组。它看起来是这样的:

kafka-acls --authorizer-properties zookeeper.connect=zk_ip_or_fqdn:2181  --add  --allow-principal User:"userName" --operation All --topic yourTopicName --group=*

在您的情况下,用户名(主体名称)将是ssl证书的主体名称,形式为“cn=toto,ou=titi,…”。

相关问题