第一次和Kafka合作,我遇到了一个问题。
我对我的消费者有以下实现:
from kafka import KafkaConsumer
import config
class KafkaMessageConsumer:
def __init__(self):
self.consumer = KafkaConsumer(
bootstrap_servers=config.KAFKA_BOOTSTRAP_SERVER,
security_protocol=config.KAFKA_SECURITY_PROTOCOL,
sasl_mechanism=config.KAFKA_SASL_MECHANISM,
sasl_plain_username=config.KAFKA_USERNAME,
sasl_plain_password=config.KAFKA_PASSWORD,
value_deserializer=lambda x: json.loads(x.decode("utf-8")),
)
def receive_messages(self, topic):
self.consumer.subscribe(topics=[topic])
print(f"Subscribed to topics: {self.consumer.subscription()}")
for msg in self.consumer:
yield msg.value
if __name__ == "__main__":
consumer = KafkaMessageConsumer()
for message in consumer.receive_messages(config.KAFKA_TOPIC):
print("Received message:", message)
应该正确实现凭据的地方。我收到订阅主题的消息,没有错误,但是没有生成消息,即使我确信有消息要在主题上使用。我在这里错过了一些必要的服务器配置吗?
1条答案
按热度按时间qxgroojn1#
我不是PythonMaven,但看起来您还没有使用任何消息。您已经订阅了该主题,但需要poll()查找消息https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html#kafka.KafkaConsumer.poll
还有,你在哪里设置了主题名称?[topic]