spark结构化流媒体在不同的工作节点上处理每一行

5n0oy7gb  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(506)

使用spark 2.3 structed streaming和kafka作为输入流。我的集群是建立在主人和3工人(master在一台worker机器上运行)我的kafka主题有3个分区作为worker的数量。我使用默认触发器和foreach sink来处理数据。
当第一条消息到达驱动程序时,它立即开始处理其中一个可用工作节点上的数据,在处理时,第二条消息到达,而不是立即开始在可用工作节点上处理它,处理的“执行”被延迟,直到第一个工作节点结束处理,现在所有的“等待执行”开始在所有可用的worker上并行处理(假设我有3条等待消息)
我怎样才能强迫等待的工人立即开始执行?

我的代码片段:

  1. val sparkSession = SparkSession.builder().config(conf).getOrCreate()
  2. import sparkSession.implicits._
  3. import org.apache.spark.sql.ForeachWriter
  4. val writer = new ForeachWriter[String] {
  5. override def open(partitionId: Long, version: Long) = true
  6. override def process(filePath: String) = {
  7. val filesSeq = fileHandler
  8. .handleData(filePath) // long processing
  9. }
  10. override def close(errorOrNull: Throwable) = {}
  11. }
  12. val filesDf = kafkaStreamSubscriber
  13. .buildtream(conf, kafkaInputTopic)
  14. val ds = filesDf.map(x=>x.getAs("filePath").asInstanceOf[String])
  15. val query =
  16. ds.writeStream
  17. .foreach(writer)
  18. .start
  19. ds.writeStream
  20. .format("console")
  21. .option("truncate", "false")
  22. .start()
  23. println("lets go....")
  24. query.awaitTermination()

我做错什么了?当我有等待处理的数据时,我不希望有空闲的工人
桑克斯

t30tvxxf

t30tvxxf1#

请参阅spark structured streaming triggers文档部分
据我所知,默认触发器一次处理一个微批。如果您需要在数据到达时立即处理数据,我建议您考虑实验连续模式。
我的理解是,如果您使用trigger,比如说5秒,微批处理将读取所有3个分区的消息,您将有3个任务同时运行。在它们全部完成之前,不会有微批开始。
希望有帮助!

相关问题