我尝试将传入的Source[ByteString,Any]广播到2个不同的流,然后扇入(zip)输出。但是,我得到错误“重载方法~>与替代品”。
val byteStringSource: Source[ByteString, Any] = Source.fromIterator(() => (1 to 10).map(i => ByteString(s"Element $i")).iterator)
val incrementer = Flow[String].map{ x =>
x
}
val multiplier = Flow[String].map{ x =>
x
}
val output = Sink.foreach[(Type1, Type2)] { n1 =>
println(s"First obj is ${(n1._1.toString)} & second obj is ${n1._2.toString}")
}
val graph = RunnableGraph.fromGraph(
GraphDSL.create() {implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[String](2))
val zip = builder.add(Zip[Type1, Type2])//fan-in operator
byteStringSource ~> broadcast
broadcast.out(0) ~> incrementer ~> zip.in0
broadcast.out(1) ~> multiplier ~> zip.in1
zip.out ~> output
ClosedShape
}
)
graph.run()
我该如何解决这个问题?
1条答案
按热度按时间mdfafbf11#
您的源的类型是
ByteString
,但广播元素的类型是String
,因此~>
运算符将不适用。对于你的简单例子,你可以删除ByteString,只使用普通字符串。如果您的真实的情况使用更复杂的类型,则可以将原始源Map到广播将接受的类型,因此类似于
在GraphDSL中使用
theSource
。