spark结构化流媒体在不同的工作节点上处理每一行

5n0oy7gb  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(442)

使用spark 2.3 structed streaming和kafka作为输入流。我的集群是建立在主人和3工人(master在一台worker机器上运行)我的kafka主题有3个分区作为worker的数量。我使用默认触发器和foreach sink来处理数据。
当第一条消息到达驱动程序时,它立即开始处理其中一个可用工作节点上的数据,在处理时,第二条消息到达,而不是立即开始在可用工作节点上处理它,处理的“执行”被延迟,直到第一个工作节点结束处理,现在所有的“等待执行”开始在所有可用的worker上并行处理(假设我有3条等待消息)
我怎样才能强迫等待的工人立即开始执行?

我的代码片段:

val sparkSession = SparkSession.builder().config(conf).getOrCreate()
import sparkSession.implicits._

import org.apache.spark.sql.ForeachWriter

val writer = new ForeachWriter[String] {
  override def open(partitionId: Long, version: Long) = true
  override def process(filePath: String) = {
    val filesSeq = fileHandler
      .handleData(filePath) // long processing

  }
  override def close(errorOrNull: Throwable) = {}
}

val filesDf = kafkaStreamSubscriber
  .buildtream(conf, kafkaInputTopic)

val ds = filesDf.map(x=>x.getAs("filePath").asInstanceOf[String])

val query =
  ds.writeStream        
    .foreach(writer)
    .start

ds.writeStream
  .format("console")
  .option("truncate", "false")
  .start()

println("lets go....")

query.awaitTermination()

我做错什么了?当我有等待处理的数据时,我不希望有空闲的工人
桑克斯

t30tvxxf

t30tvxxf1#

请参阅spark structured streaming triggers文档部分
据我所知,默认触发器一次处理一个微批。如果您需要在数据到达时立即处理数据,我建议您考虑实验连续模式。
我的理解是,如果您使用trigger,比如说5秒,微批处理将读取所有3个分区的消息,您将有3个任务同时运行。在它们全部完成之前,不会有微批开始。
希望有帮助!

相关问题