在Spark 3.1.1中,我在DataFrame上执行了一个groupBy,但没有使用distinct。
spark.conf.set("spark.sql.aggregate.partialaggregate.skip.enabled", "true")
然后运行查询
df.groupBy("method").agg(sum("request_body_len"))
Spark最终仍会执行物理计划中所示的部分聚合。
== Physical Plan ==
*(2) HashAggregate(keys=[method#23], functions=[sum(cast(request_body_len#28 as bigint))], output=[method#23, sum(request_body_len)#142L])
+- Exchange hashpartitioning(method#23, 200), ENSURE_REQUIREMENTS, [id=#58]
+- *(1) HashAggregate(keys=[method#23], functions=[partial_sum(cast(request_body_len#28 as bigint))], output=[method#23, sum#146L])
+- FileScan csv [method#23,request_body_len#28] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/mnt/http.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<method:string,request_body_len:int>
我在Youtube上看了这段视频后尝试了这个:在56:53时
这个功能在最新的Spark中不再提供了吗?或者我遗漏了什么?
1条答案
按热度按时间s4chpxco1#
The config
spark.sql.aggregate.partialaggregate.skip.enabled
doesn't exist in spark source code.All the
spark.sql.*
configurations are defined in the SQLConf object (and this one is not there): https://github.com/apache/spark/blob/651904a2ef57d2261ea6e256e4f3cdd26aa1b69d/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scalaAs far as I checked, this config was not available in previous versions as well.
In the video you shared, in the relevant slide there is a link for a PR that enables skipping partial aggregation but it seems that it wasn't merged.
There was a PR for the spark project that addresses this issue and add this config (maybe this is the same PR that was presented in the video, I don't know) but it was closed: https://github.com/apache/spark/pull/28804