我希望流在第7/8个元素后停止。为什么它没有停止?它一直持续到最后一个元素(第20个)。
我想实现的是,在系统终止时,流停止请求新元素,系统等待终止,直到流中的所有元素都被完全处理(到达接收器)
object StreamKillSwitch extends App {
implicit val system = ActorSystem(Behaviors.ignore, "sks")
implicit val ec: ExecutionContext = system.executionContext
val (killStream, done) =
Source(1 to 20)
.viaMat(KillSwitches.single)(Keep.right)
.map(i => {
system.log.info(s"Start task $i")
Thread.sleep(100)
system.log.info(s"End task $i")
i
})
.toMat(Sink.foreach(println))(Keep.both)
.run()
CoordinatedShutdown(system)
.addTask(CoordinatedShutdown.PhaseServiceUnbind, "stop-receiving") {
() => Future(killStream.shutdown()).map(_ => Done)
}
CoordinatedShutdown(system)
.addTask(CoordinatedShutdown.PhaseServiceRequestsDone, "wait-processing-complete") {
() => done
}
Thread.sleep(720)
system.terminate()
Await.ready(system.whenTerminated, 5.seconds)
}
1条答案
按热度按时间xtfmy6hx1#
我能够“解决”这个问题,但我很难解释这种行为。
由于某种原因,您的
map
(特别是Thread.sleep
)实现导致killStream.shutdown()
中的future/promise回调传播速度不够快。我猜它会用阻塞的线程填满主调度程序。您可以增加源元素
Source(1 to 10000)
的数量和Await
超时,以查看kill开关最终是否传播。但是,添加异步边界确实解决了该问题,并获得了预期的结果。
将您的
map
替换为以下mapAsync
,它将按预期工作。通过添加
async
标记可以获得类似的结果。