我有一个应用程序,它监听一个Kafka主题,并在1个线程中从中读取1条消息。在阅读一条消息之后,有一个特定的逻辑,在此期间有两个选项--一切都成功并且发生提交调用方法. acknowledge()来偏移偏移量并继续下一条消息,或者一切都不成功并且必须再次读取此消息并且不提交偏移量(. acknowledge()不被调用)
如果我没有调用Acknowledge(),那么消费者是否应该继续阅读主题中的所有报告,然后转到第二个循环,重新阅读没有调用Acknowledge()的消息?或者我如何配置它?
我正在使用spring-kafka,并指定了以下设置:
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.max-poll-records=1
我用编程的方式设置了设置:
...setGroupId("test");
...setSyncCommits(true);
...setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
...setConcurrency(1);
1条答案
按热度按时间snz8szmq1#
否; Kafka维护两个指针,最后提交的偏移量和当前位置。它们是完全无关的,除了当前位置被设置为消费者首次启动时的最后提交偏移量。
不确认(提交偏移量)记录不会更改当前位置。
失败后不能继续处理其他记录,因为它们的偏移量将覆盖失败记录的偏移量。每个分区只有一个提交的偏移量。
您可以抛出一个异常(
DefaultErrorHandler
将执行必要的查找,以便重新传递当前记录,直到重试次数耗尽)。或者,您可以在确认时调用
nack()
,它也会执行相同的操作(但没有重试耗尽的概念)。请访问https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling
和https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets的网站