如何在最大数量的重启后捕获RestartSource的错误?我想在源失败的最大次数之后做一些事情。我可以在日志中看到源重新启动。我试着添加一个withAttributes,但它从未被调用。
return RestartSource.onFailuresWithBackoff(restartSettings, () -> Consumer
.committableSource(getConsumerSettings(), topics)
.log("error on receiver topic")
.mapMaterializedValue(ctrl -> {
control = ctrl;
return NotUsed.getInstance();
})
.withAttributes(ActorAttributes.withSupervisionStrategy(e -> {
log.error("Stream has failed", e);
return (Supervision.Directive) Supervision.stop();
})));
任何建议将不胜感激。
1条答案
按热度按时间zf9nrax11#
好了,我搞定了。在这种情况下,属性似乎从未被调用,但这会导致整个流失败,因此结果将在附加流和接收器完成上。在我的情况下,我有这样的东西:
然后,我们可以像下面这样在streamCompletion上获得失败: