如何对akka流中的异常(上游故障)做出React

y3bcpkx1  于 2022-11-06  发布在  React
关注(0)|答案(1)|浏览(179)

我有一个不受我控制的akka Flow[I, O],因为它来自一些第三方代码。(例如,因为在流的某个部分抛出了异常)。为此,我需要产生失败的输入元素。我没有在流上找到任何API或类似的允许我注册处理程序或以任何方式对它做出React的API。我怎么能那样做呢?

z6psavjg

z6psavjg1#

当akka流抛出异常时,您希望使用Resume而不是Stop。在收集了所有成功的元素后,您可以使用Seq#diff来告诉由于抛出异常而丢弃了哪些元素。

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

object Exception {

  case class MyException(n: Int) extends RuntimeException

  def main(args: Array[String]): Unit = {
    implicit val system: ActorSystem = ActorSystem("Exception")
    implicit val ec: ExecutionContext = system.dispatcher

    val decider: Supervision.Decider = {
      case _: MyException => Supervision.Resume
      case _                      => Supervision.Stop
    }
    val flow = Flow[Int]
      .map(n =>
        if (n % 2 == 1) throw MyException(n)
        else n
      )
    val in = 1 to 10
    val outFuture = Source(in)
      .via(flow)
      .withAttributes(ActorAttributes.supervisionStrategy(decider))
      .runWith(Sink.seq)
    outFuture.onComplete {
      case Success(out) =>
        println("dropped elements are " + (in.diff(out)))
      case Failure(_) =>
        println("unknown failure")
    }
  }
}

控制台输出为:

dropped elements are Vector(1, 3, 5, 7, 9)

参考:How to get object that caused failure in Akka Streams?

相关问题