Kafka使用者实现在Python中不起作用

qyswt5oh  于 2023-02-03  发布在  Apache
关注(0)|答案(1)|浏览(124)

第一次和Kafka合作,我遇到了一个问题。
我对我的消费者有以下实现:

from kafka import KafkaConsumer
import config

class KafkaMessageConsumer:
    def __init__(self):
        self.consumer = KafkaConsumer(
            bootstrap_servers=config.KAFKA_BOOTSTRAP_SERVER,
            security_protocol=config.KAFKA_SECURITY_PROTOCOL,
            sasl_mechanism=config.KAFKA_SASL_MECHANISM,
            sasl_plain_username=config.KAFKA_USERNAME,
            sasl_plain_password=config.KAFKA_PASSWORD,
            value_deserializer=lambda x: json.loads(x.decode("utf-8")),
    )

    def receive_messages(self, topic):
        self.consumer.subscribe(topics=[topic])
        print(f"Subscribed to topics: {self.consumer.subscription()}")
        for msg in self.consumer:
            yield msg.value

if __name__ == "__main__":
    consumer = KafkaMessageConsumer()
    for message in consumer.receive_messages(config.KAFKA_TOPIC):
        print("Received message:", message)

应该正确实现凭据的地方。我收到订阅主题的消息,没有错误,但是没有生成消息,即使我确信有消息要在主题上使用。我在这里错过了一些必要的服务器配置吗?

qxgroojn

qxgroojn1#

我不是PythonMaven,但看起来您还没有使用任何消息。您已经订阅了该主题,但需要poll()查找消息https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html#kafka.KafkaConsumer.poll
还有,你在哪里设置了主题名称?[topic]

相关问题