Spark结构delta流的下推滤波器

fykwrbwg  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(459)

我有一个用例,我们需要将开放源代码的delta表流化到多个查询中,在其中一个分区列上进行过滤。如,。给定按年份列分区的增量表。

  1. Streaming query 1
  2. spark.readStream.format("delta").load("/tmp/delta-table/").
  3. where("year= 2013")
  4. Streaming query 2
  5. spark.readStream.format("delta").load("/tmp/delta-table/").
  6. where("year= 2014")

流式传输后,物理计划显示过滤器。

  1. > == Physical Plan == Filter (isnotnull(year#431) AND (year#431 = 2013))
  2. > +- StreamingRelation delta, []

我的问题是pushdown predicate 是否适用于delta中的流式查询?我们能从delta流式传输特定的分区吗?

plupiseo

plupiseo1#

如果列已经分区,则只扫描所需的分区。
让我们创建分区和非分区增量表并执行结构化流。
分区增量表流:

  1. val spark = SparkSession.builder().master("local[*]").getOrCreate()
  2. spark.sparkContext.setLogLevel("ERROR")
  3. import spark.implicits._
  4. //sample dataframe
  5. val df = Seq((1,2020),(2,2021),(3,2020),(4,2020),
  6. (5,2020),(6,2020),(7,2019),(8,2019),(9,2018),(10,2020)).toDF("id","year")
  7. //partionBy year column and save as delta table
  8. df.write.format("delta").partitionBy("year").save("delta-stream")
  9. //streaming delta table
  10. spark.readStream.format("delta").load("delta-stream")
  11. .where('year===2020)
  12. .writeStream.format("console").start().awaitTermination()

上述流式查询的物理计划:请注意partitionfilters

非分区增量表流:

  1. df.write.format("delta").save("delta-stream")
  2. spark.readStream.format("delta").load("delta-stream")
  3. .where('year===2020)
  4. .writeStream.format("console").start().awaitTermination()

上述流式查询的物理计划:请注意pushedfilters

展开查看全部

相关问题