目前在我的kafka消费者中,我已经关闭了auto commit,因此当前当处理消息失败时,例如:三条无效消息,手动确认失败,延迟增加到三条。
主题分区当前偏移量日志结束偏移量滞后
订单0 35 38 3
在这之后,如果一个新的传入的有效消息通过并成功地完成了对该消息的处理,那么它将被手动确认,并且在该消费者之后如下所示
主题分区当前偏移量日志结束偏移量滞后
订单0 39 39 0
当偏移量为36、37、38的消息未成功处理且同一个消费者不再读取时,为什么消费者会将当前偏移量设置为39
有人能解释一下这种行为吗?提前谢谢!
1条答案
按热度按时间mklgxw1f1#
在Kafka,消费者不会确认每一条信息。相反,它们确认(提交)最后处理的消息的偏移量。
例如,如果提交偏移量15,则隐式地表示您已经处理了0到15之间的所有消息。另外,提交15时,会覆盖以前的任何提交,因此无法知道之前是提交13还是14。
我建议你阅读文档中关于这个概念的消费者立场部分。
关于再加工,Kafka提供了一些选择。遇到处理失败时,在轮询更多消息和处理新记录之前,可以尝试重新处理消息。另一个选择是跳过它作为无效的,并继续(你目前正在做的)。
另一方面,您可以通过运行streams作业将有效消息导入选中的主题,并将错误消息转发到dlq来确保数据是好的。然后从这个选中的主题消费,你知道你只有好消息。请参见验证Kafka主题消息