我正在使用Python版本与Kafka一起工作。我有一个生产者,每次生产'n'条消息,我有一个消费者,必须消费这些消息。我希望消费者消费固定数量的消息'x',如果'x'条消息在该主题中可用,或者在主题中的消息数量小于x的情况下,消费者消费更少。我使用
msgs_pack = server.poll(timeout_ms=2500, max_records=x)
但是它没有给我我想要的行为。关于如何解决这个问题有什么建议吗?
我正在使用Python版本与Kafka一起工作。我有一个生产者,每次生产'n'条消息,我有一个消费者,必须消费这些消息。我希望消费者消费固定数量的消息'x',如果'x'条消息在该主题中可用,或者在主题中的消息数量小于x的情况下,消费者消费更少。我使用
msgs_pack = server.poll(timeout_ms=2500, max_records=x)
但是它没有给我我想要的行为。关于如何解决这个问题有什么建议吗?
1条答案
按热度按时间bnl4lu3b1#
在kafka-python中,您需要将
max_poll_records
传递给consumer属性,而不是poll方法本身。这已经默认为500,代表最多每次投票您将获得的最大金额。
否则,循环轮询记录并计数到所需的数量,然后处理批处理并提交偏移