我遵循下面的文档,我有一个生产者和消费者的工作非常好的动势流。我想了解在发生任何异常时如何处理producer(source)中的错误。
根据spring stream错误处理文档,我尝试了以下方法:
@StreamListener("errorChannel")
@ServiceActivator(inputChannel = "errorChannel")
两者都不起作用。我在producer方法中显式抛出一个runtimeexception,并期望它位于“errorchannel”中,但无法接收相同的结果。
请帮助我弄清楚这一点或与我分享它的方法,如果有人成功地做到了这一点。
1条答案
按热度按时间mitkmikd1#
注意:我几个月前就遇到了这个问题。我们用Kafka代替动觉。既然你已经为spring cloud stream binder kafka贴上了标签,我就在同一个地方提供输入。
您可以为生产者编写自定义回拨,此回拨可以告诉您消息是否已失败或成功发布。失败时,记录消息的元数据。
下面是一个小的代码片段,不能更好地解释我所说的回调。
请注意,这是给Kafka的。相应地更改运动的代码。
我写了一篇关于在spring抽象的不同层次上处理生产者失败的文章。这对你有帮助。过来看:https://medium.com/@akhil.ghatiki/kafka-producer-failure-handling-at-multiple-levels-of-spring-abstractions-e530edb02a6c生产商故障处理