我不得不使用SpringKafka(没有Spring!疯狂?但它是真的:)),我有下面的场景:配置了自动提交的使用者是记录侦听器(非批处理);侦听器可以抛出两种类型的异常:BackoffException和GeneralSystemException。在发生BackoffException时,错误处理程序应该查找分区,暂停分区,并且不提交记录;在发生GeneralSystemException时,它应发送到另一个延迟主题并提交偏移量。
即使对于BackoffException,问题也是这样的,因为isAckAfterHandling为true,所以正在提交偏移量。如果我将其设置为false,另一个异常也会有问题。
我找到的一个解决方法是:创建一个自定义的常见错误处理程序,保持isAckAfterHandling = true,执行查找并暂停,如果发生BackoffException,则返回一个Exception;
但是我想知道Spring是如何自动引导这种场景的,例如,你有一个使用者使用KafkaBackoffAwareMessageListenerAdapter和DefaultErrorHandler作为错误处理程序,并且auto commit = true。查看代码,看起来处理程序会正确地寻找,但是会提交导致BackoffException的偏移量。如果重新平衡,消息会丢失。
就像我说的,如果我使用一个自定义的错误处理程序,在BackoffException的情况下,向容器抛出一个错误(在执行seek和pause之后),这解决了我的问题,但看起来像是一个变通方案。
我相信如果Spring有一个isAckAfterHandle(异常前),应该可以解决这种问题。
事实证明,按照设计,我们应该总是为未恢复的记录返回异常,所以我的解决方案最终不是我所说的变通方案。
2条答案
按热度按时间smtd7mpg1#
据我所知,您希望以不同的方式处理两种类型的异常:BackoffException和GeneralSystemException。您希望避免为BackoffException提交偏移量,但为GeneralSystemException提交偏移量。
一种可能的解决方案是将有状态重试与实现ErrorHandler接口的自定义错误处理程序一起使用。这样,您就可以跟踪每条消息的重试状态,并根据异常类型决定是否提交。
另一种可能的解决方案是使用带有SeekToCurrentErrorHandler的KafkaBackoffAwareMessageListenerAdapter。当监听器返回回退结果时,此适配器将抛出KafkaBackoffException。然后,错误处理程序将查找失败的记录并暂停分区,直到回退时间到期。
gojuced72#
我不知道如果没有Spring,你怎么可能使用这个框架--你将失去所有的特性(比如事件发布)。
当
enable.auto.commit
为真时;容器与提交偏移量无关,它由kafka-clients
内部处理。