获取重载方法~>的替代品Akka广播

9o685dep  于 2023-10-18  发布在  其他
关注(0)|答案(1)|浏览(194)

我尝试将传入的Source[ByteString,Any]广播到2个不同的流,然后扇入(zip)输出。但是,我得到错误“重载方法~>与替代品”。

  1. val byteStringSource: Source[ByteString, Any] = Source.fromIterator(() => (1 to 10).map(i => ByteString(s"Element $i")).iterator)
  2. val incrementer = Flow[String].map{ x =>
  3. x
  4. }
  5. val multiplier = Flow[String].map{ x =>
  6. x
  7. }
  8. val output = Sink.foreach[(Type1, Type2)] { n1 =>
  9. println(s"First obj is ${(n1._1.toString)} & second obj is ${n1._2.toString}")
  10. }
  11. val graph = RunnableGraph.fromGraph(
  12. GraphDSL.create() {implicit builder: GraphDSL.Builder[NotUsed] =>
  13. import GraphDSL.Implicits._
  14. val broadcast = builder.add(Broadcast[String](2))
  15. val zip = builder.add(Zip[Type1, Type2])//fan-in operator
  16. byteStringSource ~> broadcast
  17. broadcast.out(0) ~> incrementer ~> zip.in0
  18. broadcast.out(1) ~> multiplier ~> zip.in1
  19. zip.out ~> output
  20. ClosedShape
  21. }
  22. )
  23. graph.run()

我该如何解决这个问题?

mdfafbf1

mdfafbf11#

您的源的类型是ByteString,但广播元素的类型是String,因此~>运算符将不适用。
对于你的简单例子,你可以删除ByteString,只使用普通字符串。如果您的真实的情况使用更复杂的类型,则可以将原始源Map到广播将接受的类型,因此类似于

  1. val theSource = byteStringSource.map(byteString => ...)

在GraphDSL中使用theSource

相关问题