尽管使用了RestartSource,但在连接错误后Alpakka Kafka Stream未重新启动

6vl6ewon  于 2022-11-06  发布在  Kafka
关注(0)|答案(1)|浏览(159)

我有一个简单的Kafka流的可提交源代码,它被 Package 在RestartSource中。它在happy path中运行良好,但如果我故意切断与Kafka集群的连接,它会从底层Kafka客户端抛出连接异常并报告Kafka Consumer Shut Down。我的期望是它在大约150秒后重新启动流,但它没有。我对RestartSource的理解/使用是否不正确?

val atomicControl = new AtomicReference[Consumer.Control](NoopControl)
val restartablekafkaSourceWithFlow = {
        RestartSource.withBackoff(30.seconds, 120.seconds, 0.2) {
          () => {
            Consumer.committableSource(consumerSettings.withClientId("clientId"), Subscriptions.topics(Set("someTopic")))
              .mapMaterializedValue(c => atomicControl.set(c))
              .via(someFlow)
              .via(httpFlow)
          }
        }
      }
val committerSink: Sink[(Any, ConsumerMessage.CommittableOffset), Future[Done]] = Committer.sinkWithOffsetContext(CommitterSettings(actorSystem))

val runnableGraph = restartablekafkaSourceWithFlow.toMat(committerSink)(Keep.both)

val control = runnableGraph.mapMaterializedValue(x => Consumer.DrainingControl.apply(atomicControl.get, x._2)).run()
ghhkc1vu

ghhkc1vu1#

可能您在RestartSource之外得到错误。
您可以添加recover来查看错误,和/或创建如下所示的决策器并在runnableGraph中使用它。

private val decider: Supervision.Decider = { e =>
    logger.error("Unhandled exception in stream.", e)
    Supervision.Resume
  }

runnableGraph.withAttributes(supervisionStrategy(decider))

相关问题