加入 后 筛选 - pyspark

pobjuy32  于 2022-11-21  发布在  Spark
关注(0)|答案(2)|浏览(114)

假设我有大 Dataframe X(大约500,000行)和小 Dataframe y(大约3000行)。我需要在这些df之间进行连接,然后我需要对结果df进行过滤。我最近意识到我可以对X进行过滤,这将给予我与对结果连接df进行过滤相同的结果。过滤确保了我真正的小df。
此代码已在使用中。我的问题是:spark是否足够聪明,能够在连接操作之前进行过滤,并“简化”连接?或者这只是一个小小的改进。

zbdgwd5y

zbdgwd5y1#

这可能取决于你的源代码和使用情况,但总的来说,Spark有优化规则,将过滤器推到源代码
如果现在在您的生产代码上读取了这些数据,您可以在SparkUI中进行检查
这里我有一个关于我的databricks集群的小例子:

import org.apache.spark.sql.functions._

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.adaptive.enabled", false)

val input = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/***@gmail.com/city_temperature.csv")
val dataForInput2 = Seq(("Algeria", "3"),("Germany", "3"), ("France", "5"), ("Poland", "7"), ("test55", "86"))
val input2 = dataForInput2.toDF("Country", "Value")
val joinedDfs = input.join(input2, Seq("Country"))
val finalResult = joinedDfs.filter(input("Country") === "Poland")
finalResult.show

在查询计划中,您可以看到筛选器是在联接之前推送和完成的:

r7knjye2

r7knjye22#

此代码已在使用中。我的问题是:spark是否足够聪明,能够在连接操作之前进行过滤,并“简化”连接?或者这只是一个小小的改进。
是的,您所看到的称为** predicate 下推过滤器下推**。支持取决于数据源,但通常 predicate 下推利用数据源来为它进行过滤,这节省了时间和计算能力。更多信息-https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-Optimizer-PushDownPredicate.html

相关问题