val dimensionFields = List("col1")
val metrics = List("col2", "col3", "col4")
val columnOfInterests = dimensions ++ metrics
val df = spark.read.table("some_table")
.select(columnOfInterests.map(c => col(c)):_*)
.groupBy(dimensions.map(d => col(d)): _*)
.agg(metrics.map( m => m -> "sum").toMap)
.toDF(columnOfInterests:_*) // that's the interesting part
from pyspark.sql.functions import expr
dims = ["col1"]
mertrics = ["col2", "col3", "col4"]
aggFuncs = ["sum", "avg", "max"]
aggs = [expr(f"{a}({c}) as {c}_{a}") for c in metrics for a in aggFuncs]
# or you can also do below, they are equivalent
# aggs = [expr(f"{a}({c})").alias(f"{c}_{a}") for c in metrics for a in aggFuncs]
df.groupBy(*dims).agg(*aggs).show()
4条答案
按热度按时间t1rydlwq1#
将聚合函数应用于多个列的方法有多种。
GroupedData
类为最常用的函数提供了很多方法,包括count
、max
、min
、mean
、sum
,可以直接使用如下:您可以选择传递一个应该聚合的列的列表
您还可以将dictionary/map的列a作为键,将函数作为值进行传递:
最后你可以使用varargs:
还有一些其他的方法来达到类似的效果,但这些应该绰绰有余的大部分时间。
另见:
jrcvhitl2#
这是同一概念的另一个示例,假设您有2个不同的列,并且您希望对每个列应用不同的agg函数,即
下面是实现它的方法-尽管我还不知道如何在这种情况下添加别名
请参阅以下示例-使用Map
ekqde3dh3#
当前的答案在如何创建聚合方面是完全正确的,但是没有一个答案实际上解决了问题中也要求的列别名/重命名问题。
通常,我是这样处理这种情况的:
最后一行实际上将聚合 Dataframe 的每一列重命名为原始字段,实际上将
sum(col2)
和sum(col3)
简单地更改为col2
和col3
。btqmn9zl4#
对于pyspark,你可以使用下面的,它可以合并不同的列名和聚合函数,威尔斯重命名。