Python汇合Kafka:集团授权失败

ehxuflar  于 2023-03-11  发布在  Apache
关注(0)|答案(2)|浏览(157)

如何修复:尝试使用python客户端使用Kafka的消息时出现Group authorization failed错误?
相同的设置在Kafka CLI中也能正常工作。
通常这个错误应该指向无效的权限,但是,由于CLI的工作,我怀疑它一定是其他东西。
我的Kafka示例启用了SASL/SSL。使用了以下设置:

src_schema_registry_conf = {
    'url': 'http://<<host>>:8081'
}

src_schema_registry_client = SchemaRegistryClient(src_schema_registry_conf)
string_deserializer = StringDeserializer('utf_8')
avro_deserializer = AvroDeserializer(src_schema_registry_client)#,

src_conf = {
    "bootstrap.servers": '<<host>>:9093',
    "ssl.ca.location": 'certs/catrust.pem',
    "security.protocol":"SASL_SSL",
    "sasl.mechanism":"PLAIN",
    "sasl.username":"<<username>",
    "sasl.password":'<<secret_pasword',
    'key.deserializer': string_deserializer,
     'value.deserializer': avro_deserializer,
    'group.id': "test-consumer-group",
    'auto.offset.reset': "earliest"
}

消息的使用从以下位置启动:

consumer = DeserializingConsumer(src_conf)
consumer.subscribe(['<<topic>>'])

while True:
    try:
        # SIGINT can't be handled when polling, limit timeout to 1 second.
        msg = consumer.poll(1.0)
        if msg is None:
            continue

        user = msg.value()
        if user is not None:
            print("record {}: value: {}\n"
                  .format(msg.key(), user))
    except KeyboardInterrupt:
        break

consumer.close()

我试着跟着这个例子走:https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro_consumer.py,但无法使其在我的环境中工作。

相关问题