apache-kafka 使用轮询从某个主题中读取的确切消息数

vsikbqxv  于 2022-11-01  发布在  Apache
关注(0)|答案(1)|浏览(136)

我正在使用Python版本与Kafka一起工作。我有一个生产者,每次生产'n'条消息,我有一个消费者,必须消费这些消息。我希望消费者消费固定数量的消息'x',如果'x'条消息在该主题中可用,或者在主题中的消息数量小于x的情况下,消费者消费更少。我使用

msgs_pack = server.poll(timeout_ms=2500, max_records=x)

但是它没有给我我想要的行为。关于如何解决这个问题有什么建议吗?

bnl4lu3b

bnl4lu3b1#

在kafka-python中,您需要将max_poll_records传递给consumer属性,而不是poll方法本身。
这已经默认为500,代表最多每次投票您将获得的最大金额。
否则,循环轮询记录并计数到所需的数量,然后处理批处理并提交偏移

相关问题