我有一个简单的Kafka流的可提交源代码,它被 Package 在RestartSource中。它在happy path中运行良好,但如果我故意切断与Kafka集群的连接,它会从底层Kafka客户端抛出连接异常并报告Kafka Consumer Shut Down。我的期望是它在大约150秒后重新启动流,但它没有。我对RestartSource的理解/使用是否不正确?
val atomicControl = new AtomicReference[Consumer.Control](NoopControl)
val restartablekafkaSourceWithFlow = {
RestartSource.withBackoff(30.seconds, 120.seconds, 0.2) {
() => {
Consumer.committableSource(consumerSettings.withClientId("clientId"), Subscriptions.topics(Set("someTopic")))
.mapMaterializedValue(c => atomicControl.set(c))
.via(someFlow)
.via(httpFlow)
}
}
}
val committerSink: Sink[(Any, ConsumerMessage.CommittableOffset), Future[Done]] = Committer.sinkWithOffsetContext(CommitterSettings(actorSystem))
val runnableGraph = restartablekafkaSourceWithFlow.toMat(committerSink)(Keep.both)
val control = runnableGraph.mapMaterializedValue(x => Consumer.DrainingControl.apply(atomicControl.get, x._2)).run()
1条答案
按热度按时间ghhkc1vu1#
可能您在
RestartSource
之外得到错误。您可以添加
recover
来查看错误,和/或创建如下所示的决策器并在runnableGraph
中使用它。