如何修复:尝试使用python客户端使用Kafka的消息时出现Group authorization failed
错误?
相同的设置在Kafka CLI中也能正常工作。
通常这个错误应该指向无效的权限,但是,由于CLI的工作,我怀疑它一定是其他东西。
我的Kafka示例启用了SASL/SSL。使用了以下设置:
src_schema_registry_conf = {
'url': 'http://<<host>>:8081'
}
src_schema_registry_client = SchemaRegistryClient(src_schema_registry_conf)
string_deserializer = StringDeserializer('utf_8')
avro_deserializer = AvroDeserializer(src_schema_registry_client)#,
src_conf = {
"bootstrap.servers": '<<host>>:9093',
"ssl.ca.location": 'certs/catrust.pem',
"security.protocol":"SASL_SSL",
"sasl.mechanism":"PLAIN",
"sasl.username":"<<username>",
"sasl.password":'<<secret_pasword',
'key.deserializer': string_deserializer,
'value.deserializer': avro_deserializer,
'group.id': "test-consumer-group",
'auto.offset.reset': "earliest"
}
消息的使用从以下位置启动:
consumer = DeserializingConsumer(src_conf)
consumer.subscribe(['<<topic>>'])
while True:
try:
# SIGINT can't be handled when polling, limit timeout to 1 second.
msg = consumer.poll(1.0)
if msg is None:
continue
user = msg.value()
if user is not None:
print("record {}: value: {}\n"
.format(msg.key(), user))
except KeyboardInterrupt:
break
consumer.close()
我试着跟着这个例子走:https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro_consumer.py,但无法使其在我的环境中工作。
2条答案
按热度按时间q5lcpyga1#
事实证明,答案比预想的要简单:
用户名必须用
ALLCAPS
书写。然后接受。0wi1tuuw2#
答案是API密钥无权访问主题
https://support.confluent.io/hc/en-us/articles/13193806015252-How-to-fix-the-error-Group-authorization-failed-FindCoordinator-response-error-Group-authorization-failed-