KafkaConsumer类中的方法seek()具有以下javadoc注解:
覆盖使用者将在下一次轮询(超时)时使用的提取偏移。如果多次为同一分区调用此API,则将在下一次轮询()时使用最新的偏移。请注意,如果在使用过程中任意使用此API,则可能会丢失数据,重置提取偏移
我想知道什么是“在消费中”?它是否意味着我不能安全地在轮询循环中调用seek,并且我应该只在第一次轮询 * 之前 * 才调用它?此外,当调用seek后跟随commitAsync()或commitSync()时会发生什么?seek位置会被提交吗?
感谢您的澄清!
1条答案
按热度按时间8ehkhllq1#
它指的是这样一个事实,即你首先轮询到
max.poll.records
(默认值为500),然后在处理这些数据的过程中,不管提交与否,你都将修改下一次轮询发生的位置。这意味着,你可能会跳过记录,即丢失记录(向前查找),或者最终导致重复处理(向后查找)。理想情况下,应该中断所有已使用记录的循环,
commitSync()
使用已处理的记录(如果向后查找,则不使用),调用consumer.pause()
,然后**,然后**seek
,或者是的,在轮询之前查找(并提交查找到的偏移量)。如果您不暂停/停止使用者,则使用者可能会在搜寻时重新平衡,因为可能会超过
max.poll.interval.ms
。