尽管maxRetries〈5且>0,Akka RestartSource仍会永远重试

w6lpcovy  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(103)

我试图找出为什么我在这里得到一个死循环:

object TestCase {
  implicit val ec = ExecutionContext.global
  implicit val actorSystem: ActorSystem           = ActorSystem()
  implicit val executionContext: ExecutionContext = actorSystem.dispatcher

  def main(args: Array[String]): Unit = {
    val finiteSource = Source(1 to 5).mapAsync(2) {
      i =>
        if(i == 2) {
          Future {
            Thread.sleep(50)
            i
            throw new RuntimeException()
          }
        } else Future.successful(i)

    }
    val tt = Source(1 to 5).flatMapConcat { i  =>
      println("--------------------------->"+ i)
      finiteSource
    }
    val forever: Source[Int, NotUsed] = RestartSource.onFailuresWithBackoff(
      minBackoff = Duration(10, MILLISECONDS),
      maxBackoff= Duration(10, MILLISECONDS),
      randomFactor=  0.1,
      maxRestarts= 2)(() => tt)

    val tt1 = forever.runWith(Sink.foreach(println))

    println(Await.result(tt1, Duration.Inf ))
  }
}

如果我运行这个,我会得到无尽的重试。但是一旦我将最小+最大回退更改为一个大于50毫秒的“请求时间”的值,最大重试逻辑就会工作。
我发现了一个bug吗?这个行为对我来说没有意义。为什么我需要提前知道我的请求需要多长时间来避免无限循环?

mzmfm0qo

mzmfm0qo1#

来自RestartSource.onFailuresWithBackoff的文档
maxRestarts:在minBackoff的时间范围内,重新启动的次数上限为该次数。传递0将不会导致重新启动,而负数将不会限制重新启动的次数
Akka Streams 2.6.10引入了RestartSettings,允许设置maxRestartsWithin持续时间。
请参阅this github issue proposing RestartSettings:一旦RestartSource内的源运行了minBackoff,它实际上是不朽的,因为RestartSource背后的意图是它被用于将快速失效的源。

相关问题