我无法定义 ZipWith out
作为 In
另一个的 ZipWith
. 它只从一个 kafka topics
然后流的其余部分就不起作用了,它也不会得到任何其他事件。单身 zipWith
为2工作
kafka datasources
. 每当我介绍 pmf
并连接 pm1
以及 pm2's
出口至入口 pmf
,不起作用:(你能帮忙吗?
数据检索自 Kafka topics
,它与 datasources
像源(1到100)。 Kafka datasource
我所有的测试都是用假人做的 datasources
.
package sample
import akka.{Done}
import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape}
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source, ZipWith}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import scala.concurrent.Future
object KafkaApp extends App {
implicit val system = ActorSystem("StreamBuilder2323")
implicit val materializer = ActorMaterializer()
private val s1: Source[String, Consumer.Control] = initializeKafkaSource("topic1",system)
private val s2: Source[String, Consumer.Control] = initializeKafkaSource("topic2",system)
private val s3: Source[String, Consumer.Control] = initializeKafkaSource("topic3",system)
private val s4:Source[String, Consumer.Control] = initializeKafkaSource("topic4",system)
val concat = GraphDSL.create() { implicit b ⇒
val zip = b.add(ZipWith[String,String,String](concatFunc _))
UniformFanInShape(zip.out, zip.in0, zip.in1)
}
def concatFunc(s1:String, s2:String): String ={
s1 + " _ " + s2
}
def printss(s:String): Unit ={
print(s)
}
val sinkFinal = Sink.foreach(printss)
val g = RunnableGraph.fromGraph(GraphDSL.create(sinkFinal) { implicit b ⇒ sink ⇒
import GraphDSL.Implicits._
val pm1 = b.add(concat)
val pm2 = b.add(concat)
val pmf = b.add(concat)
s1 ~> pm1.in(0)
s2 ~> pm1.in(1)
s3 ~> pm2.in(0)
s4 ~> pm2.in(1)
pm1.out ~> pmf.in(0)
pm2.out ~> pmf.in(1)
pmf.out ~> sink.in
ClosedShape
})
val max: Future[Done] = g.run()
def initializeKafkaSource(topicName : String, system : ActorSystem): Source[String,Consumer.Control] ={
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("consumerGroup")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val subscription = Subscriptions.topics(topicName)
val streamSource = Consumer.plainSource(consumerSettings, subscription).map(s => s.value())
streamSource
}
}
暂无答案!
目前还没有任何答案,快来回答吧!