Akka Stream的保持右/左/两者如何导致不同的输出?

4uqofj5v  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(184)

我正在尝试理解Keep在Akka流中是如何工作的。阅读What does Keep in akka stream mean中的答案,我知道它有助于控制我们从物化器的左/右/两侧获得结果。但是,我仍然无法构建一个可以改变左/右的值并获得不同结果的示例。
例如,

implicit val system: ActorSystem = ActorSystem("Playground")
implicit val materializer: ActorMaterializer = ActorMaterializer()

val sentenceSource = Source(List(
  "Materialized values are confusing me",
  "I love streams",
  "Left foo right bar"
))

val wordCounter = Flow[String].fold[Int](0)((currentWords, newSentence) => currentWords + newSentence.split(" ").length)
val result = sentenceSource.viaMat(wordCounter)(Keep.left).toMat(Sink.head)(Keep.right).run()

val res = Await.result(result, 2 second)
println(res)

在本例中,如果我将值从keep left更改为keep right,我仍然得到相同的结果。有人能提供一个基本示例,说明将keep更改为left/right/这两个值会导致不同的结果吗?

q9yhzks0

q9yhzks01#

在您的示例中,由于:

sentenceSource: akka.stream.scaladsl.Source[String,akka.NotUsed] = ???
wordCounter: akka.stream.scaladsl.Flow[String,Int,akka.NotUsed] = ???

两者都将NotUsed作为其物化(指示它们没有有用物化),

sentenceSource.viaMat(wordCounter)(Keep.right)
sentenceSource.viaMat(wordCounter)(Keep.left)

但是,由于Sink.head[T]物化为Future[T],因此更改组合器显然会产生影响

val intSource = sentenceSource.viaMat(wordCounter)(Keep.right)

val notUsed = intSource.toMat(Sink.head)(Keep.left)
// akka.stream.scaladsl.RunnableGraph[akka.NotUsed]

val intFut = intSource.toMat(Sink.head)(Keep.right)
// akka.stream.scaladsl.RunnableGraph[scala.concurrent.Future[Int]]

notUsed.run    // akka.NotUsed

intFut.run     // Future(Success(12))

Source中的大多数源实体化为NotUsed,几乎所有常见的Flow操作符也是如此,所以toMat(someSink)(Keep.right)(或等价的.runWith(someSink))比使用Keep.leftKeep.both要普遍得多。源/流实体化最常见的用例是提供某种控制平面,例如:

import akka.Done
import akka.stream.{ CompletionStrategy, OverflowStrategy }

import system.dispatcher

val completionMatcher: PartialFunction[Any, CompletionStrategy] = { case Done => CompletionStrategy.draining }
val failureMatcher: PartialFunction[Any, Throwable] = { case 666 => new Exception("""\m/""") }

val sentenceSource = Source.actorRef[String](completionMatcher = completionMatcher, failureMatcher = failureMatcher, bufferSize = 100, overflowStrategy = OverflowStrategy.dropNew)

// same wordCounter as before
val stream = sentenceSource.viaMat(wordCounter)(Keep.left).toMat(Sink.head)(Keep.both)    // akka.stream.scaladsl.RunnableGraph[(akka.actor.ActorRef, scala.concurrent.Future[Int])]

val (sourceRef, intFut) = stream.run()

sourceRef ! "Materialized values are confusing me"
sourceRef ! "I love streams"
sourceRef ! "Left foo right bar"
sourceRef ! Done

intFut.foreach { result =>
  println(result)
  system.terminate()
}

在本例中,我们使用Keep.left来传递sentenceSource的物化值,然后使用Keep.both来同时获得该物化值和Sink.head的物化值。

相关问题