假设我有这些设置:
- 在一个分区中具有20个消息,
- max.poll.interval.ms = 10000(10秒),
- 最大轮询记录数= 6,
- 处理消息需要1秒。
在第一次poll()时,使用者将获得6条消息,在6秒内处理完并在最后提交。
此时会发生重新平衡,或者使用者只是再次调用poll()?
让我们假设它再次调用poll():
获取接下来的6条消息,也需要6秒来处理它,因此它将超过max.poll.interval. ms。
如果我没记错的话,最后的消费者将无法提交最后6条消息,因此在重新平衡后,消费者将再次获得这6条消息,对吗?
如果是这样的话,我认为将最大轮询记录数设置为尽可能低是最好的选择,或者在每个消息处理后提交(设置分区偏移=偏移+ 1)是可行的?
1条答案
按热度按时间oalqel3c1#
在回答之前,我想重点介绍几种消费者客户端配置。
max.poll.interval.ms
使用用户群组管理时,呼叫poll()之间的延迟上限。这会设定用户在撷取更多记录之前可以闲置的时间上限。如果在此逾时到期之前未呼叫poll(),则会将用户视为失败,且群组会重新平衡,以便将分割区重新指派给其他成员。
enable.auto.commit
如果为true,将在后台定期提交使用者的偏移量。
auto.commit.interval.ms
如果enable.auto.commit设置为true,则将使用者偏移量自动提交给Kafka的频率(以毫秒为单位)。
使用者在第一次轮询()后收到6条消息,并花6秒处理它们。因此,在处理结束时不会进行重新平衡,因为最大延迟(轮询间隔)为10秒,并且没有违反该值。
处理完成后,再次调用poll(),并复位轮询间隔定时器;消费者将再次获得6条新记录,并重复该过程。
注意:假设
enable.auto.commit
设定为true
如果使用者无法在
max.poll.interval.ms
(本例中为10秒)内处理收到的消息,则可能会出现两种情况enable.auto.commit
设置为false
或auto.commit.interval.ms
高于max.poll.interval.ms
,并且在处理之前未提交偏移量-在这种情况下,将发生重新平衡,新的使用方将再次获得消息。只有在这种情况下,才必须考虑更改max.poll.records
和/或max.poll.interval.ms
enable.auto.commit
设置为true
或auto.commit.interval.ms
小于max.poll.interval.ms
-在这种情况下,会进行重新平衡,但新使用方不会再次获得6条消息,因为已提交使用的偏移量