producer:尽管Kafka是可到达的,但消息似乎永远不会被传递

wko9yo5t  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(182)

我试图创建一个简单的Kafka生产者的基础上汇合Kafka。我的代码如下:


# !/usr/bin/env python

 from confluent_kafka import Producer
 import json

 def delivery_report(err, msg):
     """Called once for each message produced to indicate delivery result.
     Triggered by poll() or flush().
     see https://github.com/confluentinc/confluent-kafka-python/blob/master/README.md"""
     if err is not None:
         print('Message delivery failed: {}'.format(err))
     else:
         print('Message delivered to {} [{}]'.format(
             msg.topic(), msg.partition()))

 class MySource:
     """Kafka producer"""
     def __init__(self, kafka_hosts, topic):
         """
         :kafka_host list(str): hostnames or 'host:port' of Kafka
         :topic str: topic to produce messages to
         """
         self.topic = topic
         # see https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
         config = {
             'metadata.broker.list': ','.join(kafka_hosts),
             'group.id': 'mygroup',
             }
         self.producer = Producer(config)

     @staticmethod
     def main():
         topic = 'my-topic'
         message = json.dumps({
             'measurement': [1, 2, 3]})
         mys = MySource(['kafka'], topic)
         mys.producer.produce(
                 topic, message, on_delivery=delivery_report)
         mys.producer.flush()

 if __name__ == "__main__":
     MySource.main()

当我第一次使用一个主题(这里是“我的主题”)时,kafka的React是“自动创建主题我的主题有1个分区,复制因子1成功(kafka.server.kafkaapis)”。但是,回调函数( on_delivery=delivery_report )从来没打过电话 flush() (如果我为flush设置了超时,它将终止)无论是第一次还是以后的时间。如果我使用现有的主题,Kafka日志不会显示任何内容。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题