带有sasl_mechanism“Oscillator BEARER”的KafkaConsumer不调用token()函数

dluptydi  于 2023-11-16  发布在  Apache
关注(0)|答案(1)|浏览(163)

我正在尝试使用kafka-python让最小的Kafka消费者工作。身份验证通过OAuth进行。
我创建了一个TokenProvider类,它工作正常。任何时候,token()被调用,它都会检查是否有当前的令牌,如果没有,从OAuth URL中获取一个。

  1. >>> from myoauth import TokenProvider
  2. >>> tp = TokenProvider(instance="test", verbose=True)
  3. >>> print(tp.token()[:20]+"...")
  4. Request to https://<hostname>/auth/realms/kafka-flow/protocol/openid-connect/token:
  5. Response 200 OK
  6. eyJhbGciOiJSUzI1NiIs...

字符串
打印可用Kafka主题的最小消费者代码如下:

  1. import ssl
  2. from kafka import KafkaConsumer
  3. from myoauth import TokenProvider
  4. bootstrap_server = '<hostname>:443'
  5. consumer = KafkaConsumer(bootstrap_servers=[bootstrap_server],
  6. security_protocol='SSL',
  7. ssl_context=ssl._create_unverified_context(),
  8. sasl_mechanism="OAUTHBEARER",
  9. sasl_oauth_token_provider=TokenProvider(instance="test", verbose=True),
  10. client_id="kafka-client",
  11. group_id="kafka-group",
  12. )
  13. print("Consumer created, {} topics found".format(len(consumer.topics())))
  14. for topic in sorted(consumer.topics()):
  15. print(topic)


输出结果令人惊讶:

  1. $ ./get_topics.py
  2. Consumer created, 0 topics found


我假设问题出在oauth上,因为TokenProvider类的token()函数从未被调用(没有Request to https://...输出)。没有token,很明显消费者无法访问任何主题。
任何想法如何解决这个赞赏。
kafka-python版本为2.0.2。python版本为3.6.8。

cuxqih21

cuxqih211#

解决了。security_protocol应该是SASL_SSL,因为它只是SSL,SASL身份验证没有被触发。

相关问题