我已经开始学习confluent Kafka(python)。有1个生产者,1个主题,1个分区和1个消费者(一个简单的设置)。我的要求是我希望集体获取数据。我读到使用poll(some_time)将等待所需的时间并批量/列表地获取记录。我认为这将是一个简单的迭代过程,类似于:
msgs = consumer.poll(1000)for msg in msgs:do some action....
问题是我无法覆盖这个'msgs'对象-我浏览了文档以了解poll返回单个消息-所以没有办法获得消息列表吗?(一个解决方案是频繁地获取所需时间的子集,然后集中处理数据-但希望有另一种方法)。poll()迭代过程似乎与kafka-python的方式不同。
2条答案
按热度按时间ego6inou1#
你需要一个while循环。它写在文档和github仓库中的可运行示例中。
Confluent库是在C/C++客户端上实现的,而不是像kafka-python或aiokafka那样在Python中原生实现的,因此不会直接使用原生Python可迭代对象。不过Kafka消费者协议 * 确实 * 进行预取,因此您实际上是从本地队列中轮询,而不是每次实际对偏移量进行一次网络调用。请参阅
queued.min.messages
-https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md如果你想要这个迭代器功能,搜索github issues。我相信有人也想要同样的东西。
jljoyd4f2#
python中的poll方法本身并不支持批处理,因此正如您提到的:“* 一个解决方案是频繁地获取所需时间的子集,然后集中处理数据 *”
看到这个article,它可以帮助你