我正在尝试使用kafka-python让最小的Kafka消费者工作。身份验证通过OAuth进行。
我创建了一个TokenProvider类,它工作正常。任何时候,token()被调用,它都会检查是否有当前的令牌,如果没有,从OAuth URL中获取一个。
>>> from myoauth import TokenProvider
>>> tp = TokenProvider(instance="test", verbose=True)
>>> print(tp.token()[:20]+"...")
Request to https://<hostname>/auth/realms/kafka-flow/protocol/openid-connect/token:
Response 200 OK
eyJhbGciOiJSUzI1NiIs...
字符串
打印可用Kafka主题的最小消费者代码如下:
import ssl
from kafka import KafkaConsumer
from myoauth import TokenProvider
bootstrap_server = '<hostname>:443'
consumer = KafkaConsumer(bootstrap_servers=[bootstrap_server],
security_protocol='SSL',
ssl_context=ssl._create_unverified_context(),
sasl_mechanism="OAUTHBEARER",
sasl_oauth_token_provider=TokenProvider(instance="test", verbose=True),
client_id="kafka-client",
group_id="kafka-group",
)
print("Consumer created, {} topics found".format(len(consumer.topics())))
for topic in sorted(consumer.topics()):
print(topic)
型
输出结果令人惊讶:
$ ./get_topics.py
Consumer created, 0 topics found
型
我假设问题出在oauth上,因为TokenProvider类的token()
函数从未被调用(没有Request to https://...
输出)。没有token,很明显消费者无法访问任何主题。
任何想法如何解决这个赞赏。kafka-python
版本为2.0.2。python
版本为3.6.8。
1条答案
按热度按时间cuxqih211#
解决了。
security_protocol
应该是SASL_SSL
,因为它只是SSL
,SASL身份验证没有被触发。