优雅地停止 akka 流

nhaq1z21  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(143)

我希望流在第7/8个元素后停止。为什么它没有停止?它一直持续到最后一个元素(第20个)。
我想实现的是,在系统终止时,流停止请求新元素,系统等待终止,直到流中的所有元素都被完全处理(到达接收器)

object StreamKillSwitch extends App {

  implicit val system = ActorSystem(Behaviors.ignore, "sks")
  implicit val ec: ExecutionContext = system.executionContext

  val (killStream, done) =
    Source(1 to 20)
      .viaMat(KillSwitches.single)(Keep.right)
      .map(i => {
        system.log.info(s"Start task $i")
        Thread.sleep(100)
        system.log.info(s"End task $i")
        i
      })
      .toMat(Sink.foreach(println))(Keep.both)
      .run()

  CoordinatedShutdown(system)
    .addTask(CoordinatedShutdown.PhaseServiceUnbind, "stop-receiving") {
      () => Future(killStream.shutdown()).map(_ => Done)
    }

  CoordinatedShutdown(system)
    .addTask(CoordinatedShutdown.PhaseServiceRequestsDone, "wait-processing-complete") {
      () => done
    }

  Thread.sleep(720)

  system.terminate()
  Await.ready(system.whenTerminated, 5.seconds)
}
xtfmy6hx

xtfmy6hx1#

我能够“解决”这个问题,但我很难解释这种行为。
由于某种原因,您的map(特别是Thread.sleep)实现导致killStream.shutdown()中的future/promise回调传播速度不够快。我猜它会用阻塞的线程填满主调度程序。
您可以增加源元素Source(1 to 10000)的数量和Await超时,以查看kill开关最终是否传播。
但是,添加异步边界确实解决了该问题,并获得了预期的结果。
将您的map替换为以下mapAsync,它将按预期工作。

.mapAsync(1) { i =>
  Future {
    system.log.info(s"Start task $i")
    Thread.sleep(100)
    system.log.info(s"Start task $i")
    i
  }
}

通过添加async标记可以获得类似的结果。

.map(i => {
  system.log.info(s"Start task $i")
  Thread.sleep(100)
  system.log.info(s"End task $i")
  i
})
.async

相关问题