假设我有大 Dataframe X(大约500,000行)和小 Dataframe y(大约3000行)。我需要在这些df之间进行连接,然后我需要对结果df进行过滤。我最近意识到我可以对X进行过滤,这将给予我与对结果连接df进行过滤相同的结果。过滤确保了我真正的小df。此代码已在使用中。我的问题是:spark是否足够聪明,能够在连接操作之前进行过滤,并“简化”连接?或者这只是一个小小的改进。
zbdgwd5y1#
这可能取决于你的源代码和使用情况,但总的来说,Spark有优化规则,将过滤器推到源代码如果现在在您的生产代码上读取了这些数据,您可以在SparkUI中进行检查这里我有一个关于我的databricks集群的小例子:
import org.apache.spark.sql.functions._ spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) spark.conf.set("spark.sql.adaptive.enabled", false) val input = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/***@gmail.com/city_temperature.csv") val dataForInput2 = Seq(("Algeria", "3"),("Germany", "3"), ("France", "5"), ("Poland", "7"), ("test55", "86")) val input2 = dataForInput2.toDF("Country", "Value") val joinedDfs = input.join(input2, Seq("Country")) val finalResult = joinedDfs.filter(input("Country") === "Poland") finalResult.show
在查询计划中,您可以看到筛选器是在联接之前推送和完成的:
r7knjye22#
此代码已在使用中。我的问题是:spark是否足够聪明,能够在连接操作之前进行过滤,并“简化”连接?或者这只是一个小小的改进。是的,您所看到的称为** predicate 下推或过滤器下推**。支持取决于数据源,但通常 predicate 下推利用数据源来为它进行过滤,这节省了时间和计算能力。更多信息-https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-Optimizer-PushDownPredicate.html
2条答案
按热度按时间zbdgwd5y1#
这可能取决于你的源代码和使用情况,但总的来说,Spark有优化规则,将过滤器推到源代码
如果现在在您的生产代码上读取了这些数据,您可以在SparkUI中进行检查
这里我有一个关于我的databricks集群的小例子:
在查询计划中,您可以看到筛选器是在联接之前推送和完成的:
r7knjye22#
此代码已在使用中。我的问题是:spark是否足够聪明,能够在连接操作之前进行过滤,并“简化”连接?或者这只是一个小小的改进。
是的,您所看到的称为** predicate 下推或过滤器下推**。支持取决于数据源,但通常 predicate 下推利用数据源来为它进行过滤,这节省了时间和计算能力。更多信息-https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-Optimizer-PushDownPredicate.html