如何配置kafka消费者,以便在自动缩放期间和之后获取的消息总数保持不变?

xqkwcwgp  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(295)

假设我有一个消费者正在运行,它从10个分区获取数据。在一个轮询请求中,使用者每个分区获取10条记录,总共100条记录。
现在,在向组中再添加一个使用者并重新平衡之后,两个使用者都从5个分区获取数据,每个使用者现在总共获取50条记录(每个分区10条)。
我想知道是否有一种方法可以配置kafka消费者,这样即使再添加一个消费者,两个消费者也会开始在每个分区获取20条记录,这样总数仍然是100条。
我尝试使用max.poll.records和fetch.max.bytes,但对我无效。将fetch.max.bytes设置为1000之后,kafka正在从分区中获取25条记录。将max.poll.records设置为50后,每个分区在轮询期间有25条max记录,因此10个分区有250条记录。我想把记录保持在总共50张。我该怎么做?

3ks5zfa0

3ks5zfa01#

没有可以设置的直接配置来告诉kafkaconsumer它应该获取多少消息。
我确信还有其他解决方案,但我看到以下两种选择:
如果您知道消息的大小,并且消息的字节大小大致相同,请使用 fetch.min.bytes 一起 fetch.max.wait.ms 获取所需的最少消息。调整 max.poll.records 你可以试着找到确切的号码。
使用 seek 以准确地告诉使用者每个分区应该在下一个分区上获取数据的偏移位置 poll . 这个 seek 在kafkaconsumer的javadocs中,api被描述为“覆盖消费者将在下一次轮询(超时)时使用的获取偏移量”。如果对同一分区多次调用此api,则下次poll()时将使用最新的偏移量。请注意,如果在使用过程中随意使用此api来重置获取偏移量,则可能会丢失数据”。

相关问题