使用不同选项的akka流物化值

bksxznpy  于 2022-11-06  发布在  其他
关注(0)|答案(2)|浏览(116)

我是akka流的新手,想了解流中的实体化是如何工作的

//Print sum of elements from 1 to 10
val newSource = Source(1 to 10)
val flow      = Flow[Int].fold(0)((a, b) => a + b)
val sink      = Sink.foreach(println)
val sumFuture = newSource.via(flow).toMat(sink)(Keep.left).run()

它使用Keep.leftKeep.right打印值55。这两者有何区别?
我想了解一下Keep.leftKeep.right给予的不同值,以及如何使用Keep.both

jyztefdp

jyztefdp1#

物化值可以由接收器和源产生。可以通过将源与接收器组合来创建可运行的图。Keep定义组合时要保留哪个物化值

  • Keep.right选择汇的物化值
  • Keep.left选择源的物化值
  • Keep.both以元组的形式选取两者
  • Keep.none忽略这两者,并选取NotUsed,即指示没有物化值的标记。

默认情况下,Keep.left用于操作viato等。
以下示例强调了这一点
给定Source[Int, String]Sink[Int, Future[Int]]

val source: Source[Int, String] = Source(List(1, 2, 3)).mapMaterializedValue(_ => "Source Mat Value")
val sink: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _)

我们可以将sourcesink组合起来,创建一个具有不同物化值的可运行图。

val left: String = source.to(sink).run() //same as toMat(...)(Keep.left)
val right: Future[Int] = source.toMat(sink)(Keep.right).run()
val both: (String, Future[Int]) = source.toMat(sink)(Keep.both).run()

现在,如果我们运行它并打印它产生的每个物化值

left=Source Mat Value
right=Future(Success(6))
both=(Source Mat Value,Future(Success(6)))

请不要混淆物化价值与流元素的处理。
考虑以下fold阶段

val flowFold: Flow[Int, Int, NotUsed] = Flow[Int].fold(0)(_ + _)
val sinkFold: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _)

flowFoldfold函数应用于流中的每个元素,并将一个表示fold结果的单个值推送到下游。如果需要,可以进一步处理此元素。
然而,sinkFold是图中的最后一个阶段,它不能将元素进一步推向下游。当图处理完所有元素并完成时,它使用物化值Future[Int]返回fold结果。
如果Flow.fold的值是55,则这应该是流的具体化值而不是NotUsed
否,值55不是具体化值。它作为元素被推送到下游接收器。
你可以在Sink.head的帮助下“捕捉”物化值中的元素55

val flow: Flow[Int, Int, NotUsed] = Flow[Int].fold(0)(_ + _)
val alternativeFoldSink: Sink[Int, Future[Int]] = flow.toMat(Sink.head)(Keep.right)

每一个阶段都能产生物化价值,那么(为什么)Flow.fold不能产生物化价值。
是的,每个阶段都可能产生物化值。但是Flow.fold的设计并不是这样的。大多数Flow的定义都没有提供物化值。如果你想使用物化值和fold,我建议使用Sink.fold

klh5stk1

klh5stk12#

重要的是要记住,一个流阶段可以有

  • 返回一个具体化值,它是在流具体化时,在任何元素通过该具体化之前创建的。因此,它不能依赖于通过/传入/传出流的值。
  • 在流运行时传递到流的下一级的零个或多个输出值。

每个阶段都有一个具体化的值。每个不是接收器的阶段都可能有输出值。对于源,一般来说,具体化的值提供了一些影响流的行为的方法(例如,Source.actorRef的物化值是ActorRef,它允许您通过向ActorRef发送消息来将元素推送到流中,或者AlpakkaKafka中的各种Kafka消费者来源允许您停止从Kafka消费,而不停止流,直到流被排干)。
通常,对于接收器,从接收器获取值的唯一方法是通过物化值由于必须在任何数据流过流之前创建物化值,这就是为什么大多数接收器具体化为Future(尚未可用的数据的占位符),并且通常在流完成之前不会完成该值(因为Future是最多写一次的)。
每个阶段都有具体化值,但并非每个阶段都有有有意义的具体化值:对于那些,特殊的NotUsed值(一个单例)编码为“无意义”。2大多数流阶段都属于这一类别:它们存在只是为了将输入转换为输出。

相关问题