我们开始在一个项目中使用springintegrationkafka,但是找不到任何关于用户端错误处理建议方法的文档。理想情况下,我们需要一个不阻塞线程的退避重试策略,因此,无状态重试建议似乎不是一个选项;但是,我找不到一个使用kafka的有状态重试建议的例子。关于如何使用spring集成kafka处理错误重试,有什么建议吗?谢谢。
soat7uwm1#
因为您的要求是在重试期间不阻塞线程,所以您可以将失败的消息写入另一个主题,并让一些后台进程从这个“死信队列”中使用,稍后再重新发布到原始主题。使用rabbitmq,这可以通过死信交换的适当配置自动完成,死信队列具有消息生存时间,但我不认为kafka有任何相似之处,因此您必须自己滚动。
moiiocjp2#
请共享有关此问题的配置,并指出您希望在何处使用重试建议。通常是任何 inbound-channel-adapter 把它的消息放到某个有订户的频道上,比如 <service-activator> 可配置为 RequestHandlerRetryAdvice .从另一边 <poller> 上 <int-kafka:inbound-channel-adapter> 可配置为 StatefulRetryOperationsInterceptor 如你所愿。
inbound-channel-adapter
<service-activator>
RequestHandlerRetryAdvice
<poller>
<int-kafka:inbound-channel-adapter>
StatefulRetryOperationsInterceptor
2条答案
按热度按时间soat7uwm1#
因为您的要求是在重试期间不阻塞线程,所以您可以将失败的消息写入另一个主题,并让一些后台进程从这个“死信队列”中使用,稍后再重新发布到原始主题。
使用rabbitmq,这可以通过死信交换的适当配置自动完成,死信队列具有消息生存时间,但我不认为kafka有任何相似之处,因此您必须自己滚动。
moiiocjp2#
请共享有关此问题的配置,并指出您希望在何处使用重试建议。
通常是任何
inbound-channel-adapter
把它的消息放到某个有订户的频道上,比如<service-activator>
可配置为RequestHandlerRetryAdvice
.从另一边
<poller>
上<int-kafka:inbound-channel-adapter>
可配置为StatefulRetryOperationsInterceptor
如你所愿。