如何在最大RestartSource后捕获alpakka Kafka源流故障

0kjbasz6  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(112)

如何在最大数量的重启后捕获RestartSource的错误?我想在源失败的最大次数之后做一些事情。我可以在日志中看到源重新启动。我试着添加一个withAttributes,但它从未被调用。

  1. return RestartSource.onFailuresWithBackoff(restartSettings, () -> Consumer
  2. .committableSource(getConsumerSettings(), topics)
  3. .log("error on receiver topic")
  4. .mapMaterializedValue(ctrl -> {
  5. control = ctrl;
  6. return NotUsed.getInstance();
  7. })
  8. .withAttributes(ActorAttributes.withSupervisionStrategy(e -> {
  9. log.error("Stream has failed", e);
  10. return (Supervision.Directive) Supervision.stop();
  11. })));

任何建议将不胜感激。

zf9nrax1

zf9nrax11#

好了,我搞定了。在这种情况下,属性似乎从未被调用,但这会导致整个流失败,因此结果将在附加流和接收器完成上。在我的情况下,我有这样的东西:

  1. streamCompletion =
  2. createSource()
  3. ...
  4. runWith(Sink.ignore(), actorSystem);

然后,我们可以像下面这样在streamCompletion上获得失败:

  1. streamCompletion.whenComplete((done, throwable) -> {
  2. if (throwable != null) {
  3. log.error("Some error occurred {} {}", done, throwable.getMessage(), throwable);
  4. }
  5. });

相关问题