获取重载方法~>的替代品Akka广播

9o685dep  于 2023-10-18  发布在  其他
关注(0)|答案(1)|浏览(133)

我尝试将传入的Source[ByteString,Any]广播到2个不同的流,然后扇入(zip)输出。但是,我得到错误“重载方法~>与替代品”。

val byteStringSource: Source[ByteString, Any] = Source.fromIterator(() => (1 to 10).map(i => ByteString(s"Element $i")).iterator)

  val incrementer = Flow[String].map{ x =>
    x
  }
  val multiplier = Flow[String].map{ x =>
    x
  }

  val output = Sink.foreach[(Type1, Type2)] { n1 =>
    println(s"First obj is ${(n1._1.toString)} & second obj is ${n1._2.toString}")
  }

  val graph = RunnableGraph.fromGraph(
    GraphDSL.create() {implicit builder: GraphDSL.Builder[NotUsed] =>
      import GraphDSL.Implicits._

      val broadcast = builder.add(Broadcast[String](2))
      val zip = builder.add(Zip[Type1, Type2])//fan-in operator

      byteStringSource ~> broadcast
      broadcast.out(0) ~> incrementer ~> zip.in0
      broadcast.out(1) ~> multiplier ~> zip.in1

      zip.out ~> output
      ClosedShape
    }
  )
  graph.run()

我该如何解决这个问题?

mdfafbf1

mdfafbf11#

您的源的类型是ByteString,但广播元素的类型是String,因此~>运算符将不适用。
对于你的简单例子,你可以删除ByteString,只使用普通字符串。如果您的真实的情况使用更复杂的类型,则可以将原始源Map到广播将接受的类型,因此类似于

val theSource = byteStringSource.map(byteString => ...)

在GraphDSL中使用theSource

相关问题