HDFS Apache Spark Dataframe如何在使用groupBy时关闭部分聚合?

apeeds0o  于 2022-12-09  发布在  HDFS
关注(0)|答案(1)|浏览(226)

在Spark 3.1.1中,我在DataFrame上执行了一个groupBy,但没有使用distinct。

spark.conf.set("spark.sql.aggregate.partialaggregate.skip.enabled", "true")

然后运行查询

df.groupBy("method").agg(sum("request_body_len"))

Spark最终仍会执行物理计划中所示的部分聚合。

== Physical Plan ==
*(2) HashAggregate(keys=[method#23], functions=[sum(cast(request_body_len#28 as bigint))], output=[method#23, sum(request_body_len)#142L])
+- Exchange hashpartitioning(method#23, 200), ENSURE_REQUIREMENTS, [id=#58]
   +- *(1) HashAggregate(keys=[method#23], functions=[partial_sum(cast(request_body_len#28 as bigint))], output=[method#23, sum#146L])
      +- FileScan csv [method#23,request_body_len#28] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/mnt/http.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<method:string,request_body_len:int>

我在Youtube上看了这段视频后尝试了这个:在56:53时
这个功能在最新的Spark中不再提供了吗?或者我遗漏了什么?

s4chpxco

s4chpxco1#

The config spark.sql.aggregate.partialaggregate.skip.enabled doesn't exist in spark source code.
All the spark.sql.* configurations are defined in the SQLConf object (and this one is not there): https://github.com/apache/spark/blob/651904a2ef57d2261ea6e256e4f3cdd26aa1b69d/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
As far as I checked, this config was not available in previous versions as well.
In the video you shared, in the relevant slide there is a link for a PR that enables skipping partial aggregation but it seems that it wasn't merged.
There was a PR for the spark project that addresses this issue and add this config (maybe this is the same PR that was presented in the video, I don't know) but it was closed: https://github.com/apache/spark/pull/28804

相关问题