使用kafka在长时间运行的spark作业之间进行通信

lnvxswe2  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(536)

我是apachespark新手,需要在spark集群上同时运行几个长时间运行的进程(作业)。通常,这些单独的流程(每个流程都是自己的工作)需要相互通信。我正在尝试使用Kafka作为这些过程之间的中间人。因此,高级别的工作间沟通看起来像:
作业#1做一些工作,并将消息发布到Kafka主题
作业#2设置为流式接收器(使用 StreamingContext )同一个Kafka主题,一旦消息发布到该主题,job#2就会使用它
作业#2现在可以根据它所消耗的消息做一些工作
据我所知,流式处理上下文正在阻止在spark驱动程序节点上运行的侦听器。这意味着一旦我像这样启动流媒体消费者:

def createKafkaStream(ssc: StreamingContext,
        kafkaTopics: String, brokers: String): DStream[(String, 
        String)] = {
    // some configs here
    KafkaUtils.createDirectStream[String, String, StringDecoder,
        StringDecoder](ssc, props, topicsSet)
}

def consumerHandler(): StreamingContext = {
    val ssc = new StreamingContext(sc, Seconds(10))

    createKafkaStream(ssc, "someTopic", "my-kafka-ip:9092").foreachRDD(rdd => {
        rdd.collect().foreach { msg =>
            // Now do some work as soon as we receive a messsage from the topic
        }
    })

    ssc
}

StreamingContext.getActive.foreach {
    _.stop(stopSparkContext = false)
}

val ssc = StreamingContext.getActiveOrCreate(consumerHandler)
ssc.start()
ssc.awaitTermination()

…现在有两种含义:
司机现在堵住并倾听Kafka的工作消耗;和
当接收到工作(消息)时,它们被发送到任何可用的工作节点,以便实际执行
所以,首先,如果我上面说的任何东西是不正确的或是误导性的,请先纠正我!假设我或多或少是正确的,那么我只是想知道,根据我的标准,是否有一种更具伸缩性或性能的方法来实现这一点。同样,我有两个长时间运行的作业(作业1和作业2)正在我的spark节点上运行,其中一个需要能够将工作“发送”给另一个。有什么想法吗?

kyxcudwk

kyxcudwk1#

据我所知,流式处理上下文正在阻止在spark驱动程序节点上运行的侦听器。
StreamingContext (单数)不是阻塞侦听器。它的任务是为流式处理作业创建执行图。
当您开始从kafka读取数据时,您指定每10秒获取一次新记录。从现在起发生的事情取决于你对Kafka使用的是哪种Kafka抽象,或者是通过 KafkaUtils.createStream ,或通过 KafkaUtils.createDirectStream .
一般来说,这两种方法都是从kafka中消耗数据,然后将数据分配给每个spark worker进行并行处理。
那么我只是想知道是否有一个更具可扩展性或性能的方法来实现这一点
这种方法具有高度可扩展性。当使用无接收器方法时,每个kafka分区Map到给定rdd中的一个spark分区。您可以通过增加kafka中的分区数量,或者通过在spark中重新分区数据(使用 DStream.repartition ). 我建议测试这个设置,以确定它是否适合您的性能要求。

相关问题