我试图创建一个简单的Kafka生产者的基础上汇合Kafka。我的代码如下:
# !/usr/bin/env python
from confluent_kafka import Producer
import json
def delivery_report(err, msg):
"""Called once for each message produced to indicate delivery result.
Triggered by poll() or flush().
see https://github.com/confluentinc/confluent-kafka-python/blob/master/README.md"""
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(
msg.topic(), msg.partition()))
class MySource:
"""Kafka producer"""
def __init__(self, kafka_hosts, topic):
"""
:kafka_host list(str): hostnames or 'host:port' of Kafka
:topic str: topic to produce messages to
"""
self.topic = topic
# see https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
config = {
'metadata.broker.list': ','.join(kafka_hosts),
'group.id': 'mygroup',
}
self.producer = Producer(config)
@staticmethod
def main():
topic = 'my-topic'
message = json.dumps({
'measurement': [1, 2, 3]})
mys = MySource(['kafka'], topic)
mys.producer.produce(
topic, message, on_delivery=delivery_report)
mys.producer.flush()
if __name__ == "__main__":
MySource.main()
当我第一次使用一个主题(这里是“我的主题”)时,kafka的React是“自动创建主题我的主题有1个分区,复制因子1成功(kafka.server.kafkaapis)”。但是,回调函数( on_delivery=delivery_report
)从来没打过电话 flush()
(如果我为flush设置了超时,它将终止)无论是第一次还是以后的时间。如果我使用现有的主题,Kafka日志不会显示任何内容。
暂无答案!
目前还没有任何答案,快来回答吧!