我试图找出为什么我在这里得到一个死循环:
object TestCase {
implicit val ec = ExecutionContext.global
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val executionContext: ExecutionContext = actorSystem.dispatcher
def main(args: Array[String]): Unit = {
val finiteSource = Source(1 to 5).mapAsync(2) {
i =>
if(i == 2) {
Future {
Thread.sleep(50)
i
throw new RuntimeException()
}
} else Future.successful(i)
}
val tt = Source(1 to 5).flatMapConcat { i =>
println("--------------------------->"+ i)
finiteSource
}
val forever: Source[Int, NotUsed] = RestartSource.onFailuresWithBackoff(
minBackoff = Duration(10, MILLISECONDS),
maxBackoff= Duration(10, MILLISECONDS),
randomFactor= 0.1,
maxRestarts= 2)(() => tt)
val tt1 = forever.runWith(Sink.foreach(println))
println(Await.result(tt1, Duration.Inf ))
}
}
如果我运行这个,我会得到无尽的重试。但是一旦我将最小+最大回退更改为一个大于50毫秒的“请求时间”的值,最大重试逻辑就会工作。
我发现了一个bug吗?这个行为对我来说没有意义。为什么我需要提前知道我的请求需要多长时间来避免无限循环?
1条答案
按热度按时间mzmfm0qo1#
来自
RestartSource.onFailuresWithBackoff
的文档maxRestarts
:在minBackoff
的时间范围内,重新启动的次数上限为该次数。传递0将不会导致重新启动,而负数将不会限制重新启动的次数Akka Streams 2.6.10引入了
RestartSettings
,允许设置maxRestartsWithin
持续时间。请参阅this github issue proposing
RestartSettings
:一旦RestartSource
内的源运行了minBackoff
,它实际上是不朽的,因为RestartSource
背后的意图是它被用于将快速失效的源。