我尝试将传入的Source[ByteString, Any]
放入2 sinks
中,并尝试将传入的流复制到akka streaming graphs
中。我得到所需的'is'
作为输入流,但bs
不是Source[ByteString, Any]
类型。我得到了boxed error
。
private def duplicateStream(content: Source[ByteString, Any]): Future[Either[X, Y]] = {
val sink = StreamConverters.asInputStream()
val (is, bs): (InputStream, Future[ByteString]) = content
.alsoToMat(sink)(Keep.right)
.toMat(Sink.last)(Keep.both)
.run()
//is is input stream which is desired
//bs should be Source[ByteString, Any]
}
我怎样才能从这个图中得到bs
作为Source[ByteString, Any]
?
2条答案
按热度按时间jjhzyzn01#
这可以通过在
akka streams
中使用[Broadcast]
来实现。它可以用来创建多个流并独立使用它们。参考:https://doc.akka.io/docs/akka/current/stream/operators/Broadcast.html#broadcast
nwwlzxa72#
您正在使用
Sink.last
,根据定义,它将只保留源代码的最后一项,并具体化为该项的Future
。因此,您所观察到的,您有两个汇的物化值:输入流和最后一个值。