我正在学习Kafka,我正在使用JAVA API玩以下场景:
- 手动提交
- 单主题,单分区
- 单一生产者单一消费者
- 最大轮询记录数=5或1(两者获得相同结果)
制片人给Kafka发了一些信息。消费者检索消息,并且应该模拟异常情况行为:随机地,每~10个消息一次,其未提交的偏移。对于剩余的90%,消费者在处理每个消息之后调用commitSynch。
预期的好场景:如果我以消费者程序结束并调用consumer.close()的方式模拟异常情况,那么一切都按预期进行。也就是说,如果offest 999是最后一个未提交的偏移量,那么当我再次运行消费者程序时,它将从偏移量999开始。
我需要帮助的意外情况:我想做同样的事情,但不退出消费者程序。当消费者处理下一条消息并随机决定不确认它时,流程简单地转到下一次执行consumer. poll。在下一次轮询中,再次假设最后一个未提交的偏移量是999,我希望偏移量为999的消息是第一个返回的消息。但事实并非如此:我得到偏移量为1000、1001...因此,实际上,即使偏移量999尚未提交,也从不重新处理具有偏移量999的消息。
我想首先理解的是这个区别--重新处理发生在消费者程序再次运行时,但它不会发生在消费者程序不断循环运行时。
2条答案
按热度按时间xyhw6mcr1#
如果从未重新启动使用者,则将始终按顺序轮询偏移量。这应该是预期的行为。
只有当它重新启动时,
auto.offset.reset
属性才会对未提交的偏移量生效,否则将使用已提交的值,欢迎您通过手动调用seek函数来忽略该值也就是说,如果您试图实现重新处理逻辑,您的选择是在内存中手动临时跟踪导致异常的偏移量,并向后搜索+轮询重试,或者引入一个死信主题供以后使用
niknxzdl2#
我找到了一种方法,通过取消订阅消费者并在异常条件发生时再次订阅,使第二个场景按照我希望的方式运行(即,与第一个场景完全相同)。完成后,消费者从最后一个未提交的偏移量开始。