将带出口的管道连接到另一个带入口的管道

yhxst69z  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(237)

我无法定义 ZipWith out 作为 In 另一个的 ZipWith . 它只从一个 kafka topics 然后流的其余部分就不起作用了,它也不会得到任何其他事件。单身 zipWith 为2工作
kafka datasources . 每当我介绍 pmf 并连接 pm1 以及 pm2's 出口至入口 pmf ,不起作用:(你能帮忙吗?
数据检索自 Kafka topics ,它与 datasources 像源(1到100)。 Kafka datasource 我所有的测试都是用假人做的 datasources .

package sample

import akka.{Done}  
import akka.actor.ActorSystem  
import akka.kafka.{ConsumerSettings, Subscriptions}  
import akka.kafka.scaladsl.Consumer  
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape}  
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source, ZipWith}  
import org.apache.kafka.clients.consumer.ConsumerConfig  
import org.apache.kafka.common.serialization.StringDeserializer  

import scala.concurrent.Future

object KafkaApp extends App {
  implicit val system = ActorSystem("StreamBuilder2323")
  implicit val materializer = ActorMaterializer()

  private val s1: Source[String, Consumer.Control] = initializeKafkaSource("topic1",system)
  private val s2: Source[String, Consumer.Control] = initializeKafkaSource("topic2",system)

  private val s3: Source[String, Consumer.Control] = initializeKafkaSource("topic3",system)

  private val s4:Source[String, Consumer.Control] = initializeKafkaSource("topic4",system)

  val concat = GraphDSL.create() { implicit b ⇒
    val zip = b.add(ZipWith[String,String,String](concatFunc _))
    UniformFanInShape(zip.out, zip.in0, zip.in1)
  }

  def concatFunc(s1:String, s2:String): String ={
    s1 + " _ " + s2
  }

  def printss(s:String): Unit ={
    print(s)
  }

  val sinkFinal = Sink.foreach(printss)

  val g = RunnableGraph.fromGraph(GraphDSL.create(sinkFinal) { implicit b ⇒     sink ⇒
    import GraphDSL.Implicits._
    val pm1 = b.add(concat)
    val pm2 = b.add(concat)
    val pmf = b.add(concat)
    s1  ~> pm1.in(0)
    s2  ~> pm1.in(1)

    s3  ~> pm2.in(0)
    s4  ~> pm2.in(1)

    pm1.out ~> pmf.in(0)
    pm2.out ~> pmf.in(1)

    pmf.out ~> sink.in

    ClosedShape
  })

  val max: Future[Done] = g.run()

  def initializeKafkaSource(topicName : String, system : ActorSystem): Source[String,Consumer.Control] ={
    val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("consumerGroup")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    val subscription = Subscriptions.topics(topicName)
    val streamSource = Consumer.plainSource(consumerSettings, subscription).map(s => s.value())
    streamSource
  }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题