akka 将Source[ByteString,Any]从流中提取到2个接收器中

yyyllmsg  于 2023-10-18  发布在  其他
关注(0)|答案(2)|浏览(220)

我尝试将传入的Source[ByteString, Any]放入2 sinks中,并尝试将传入的流复制到akka streaming graphs中。我得到所需的'is'作为输入流,但bs不是Source[ByteString, Any]类型。我得到了boxed error

  1. private def duplicateStream(content: Source[ByteString, Any]): Future[Either[X, Y]] = {
  2. val sink = StreamConverters.asInputStream()
  3. val (is, bs): (InputStream, Future[ByteString]) = content
  4. .alsoToMat(sink)(Keep.right)
  5. .toMat(Sink.last)(Keep.both)
  6. .run()
  7. //is is input stream which is desired
  8. //bs should be Source[ByteString, Any]
  9. }

我怎样才能从这个图中得到bs作为Source[ByteString, Any]

jjhzyzn0

jjhzyzn01#

这可以通过在akka streams中使用[Broadcast]来实现。它可以用来创建多个流并独立使用它们。

  1. val source: Source[Int, NotUsed] =
  2. Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current().nextInt(100))).take(100)
  3. val countSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => acc + 1)
  4. val minSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => math.min(acc, elem))
  5. val maxSink: Sink[Int, Future[Int]] = Sink.fold(0)((acc, elem) => math.max(acc, elem))
  6. val (count: Future[Int], min: Future[Int], max: Future[Int]) =
  7. RunnableGraph
  8. .fromGraph(GraphDSL.createGraph(countSink, minSink, maxSink)(Tuple3.apply) {
  9. implicit builder => (countS, minS, maxS) =>
  10. import GraphDSL.Implicits._
  11. val broadcast = builder.add(Broadcast[Int](3))
  12. source ~> broadcast
  13. broadcast.out(0) ~> countS
  14. broadcast.out(1) ~> minS
  15. broadcast.out(2) ~> maxS
  16. ClosedShape
  17. })
  18. .run()

参考:https://doc.akka.io/docs/akka/current/stream/operators/Broadcast.html#broadcast

展开查看全部
nwwlzxa7

nwwlzxa72#

您正在使用Sink.last,根据定义,它将只保留源代码的最后一项,并具体化为该项的Future
因此,您所观察到的,您有两个汇的物化值:输入流和最后一个值。

相关问题