akka 将Source[ByteString,Any]从流中提取到2个接收器中

yyyllmsg  于 2023-10-18  发布在  其他
关注(0)|答案(2)|浏览(181)

我尝试将传入的Source[ByteString, Any]放入2 sinks中,并尝试将传入的流复制到akka streaming graphs中。我得到所需的'is'作为输入流,但bs不是Source[ByteString, Any]类型。我得到了boxed error

private def duplicateStream(content: Source[ByteString, Any]): Future[Either[X, Y]] = {
val sink = StreamConverters.asInputStream()
    val (is, bs): (InputStream, Future[ByteString]) = content
      .alsoToMat(sink)(Keep.right)
      .toMat(Sink.last)(Keep.both)
      .run()

//is is input stream which is desired
//bs should be Source[ByteString, Any]
}

我怎样才能从这个图中得到bs作为Source[ByteString, Any]

jjhzyzn0

jjhzyzn01#

这可以通过在akka streams中使用[Broadcast]来实现。它可以用来创建多个流并独立使用它们。

val source: Source[Int, NotUsed] =
  Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current().nextInt(100))).take(100)

val countSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => acc + 1)
val minSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => math.min(acc, elem))
val maxSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => math.max(acc, elem))

val (count: Future[Int], min: Future[Int], max: Future[Int]) =
  RunnableGraph
    .fromGraph(GraphDSL.createGraph(countSink, minSink, maxSink)(Tuple3.apply) {
      implicit builder => (countS, minS, maxS) =>
        import GraphDSL.Implicits._
        val broadcast = builder.add(Broadcast[Int](3))
        source ~> broadcast
        broadcast.out(0) ~> countS
        broadcast.out(1) ~> minS
        broadcast.out(2) ~> maxS
        ClosedShape
    })
    .run()

参考:https://doc.akka.io/docs/akka/current/stream/operators/Broadcast.html#broadcast

nwwlzxa7

nwwlzxa72#

您正在使用Sink.last,根据定义,它将只保留源代码的最后一项,并具体化为该项的Future
因此,您所观察到的,您有两个汇的物化值:输入流和最后一个值。

相关问题