我有一个不受我控制的akka Flow[I, O],因为它来自一些第三方代码。(例如,因为在流的某个部分抛出了异常)。为此,我需要产生失败的输入元素。我没有在流上找到任何API或类似的允许我注册处理程序或以任何方式对它做出React的API。我怎么能那样做呢?
Flow[I, O]
z6psavjg1#
当akka流抛出异常时,您希望使用Resume而不是Stop。在收集了所有成功的元素后,您可以使用Seq#diff来告诉由于抛出异常而丢弃了哪些元素。
Resume
Stop
Seq#diff
import scala.concurrent.ExecutionContext import scala.util.{Failure, Success} object Exception { case class MyException(n: Int) extends RuntimeException def main(args: Array[String]): Unit = { implicit val system: ActorSystem = ActorSystem("Exception") implicit val ec: ExecutionContext = system.dispatcher val decider: Supervision.Decider = { case _: MyException => Supervision.Resume case _ => Supervision.Stop } val flow = Flow[Int] .map(n => if (n % 2 == 1) throw MyException(n) else n ) val in = 1 to 10 val outFuture = Source(in) .via(flow) .withAttributes(ActorAttributes.supervisionStrategy(decider)) .runWith(Sink.seq) outFuture.onComplete { case Success(out) => println("dropped elements are " + (in.diff(out))) case Failure(_) => println("unknown failure") } } }
控制台输出为:
dropped elements are Vector(1, 3, 5, 7, 9)
参考:How to get object that caused failure in Akka Streams?
1条答案
按热度按时间z6psavjg1#
当akka流抛出异常时,您希望使用
Resume
而不是Stop
。在收集了所有成功的元素后,您可以使用Seq#diff
来告诉由于抛出异常而丢弃了哪些元素。控制台输出为:
参考:How to get object that caused failure in Akka Streams?