使用具有countDistinct函数的字典的Pyspark聚集

0sgqnhkj  于 2022-11-01  发布在  Spark
关注(0)|答案(2)|浏览(139)

我尝试在 Dataframe 上运行聚合。然后我想计算每一列上的不同值。我生成了一个聚合字典,如下所示:

from pyspark.sql.functions import countDistinct

expr = {x: "countDistinct" for x in df.columns if x is not 'id'}
df.groupBy("id").agg(expr).show()

我收到错误:
分析异常:未定义的函数:“countdistinct”。此函数既不是注册的临时函数,也不是在数据库“default”中注册的永久函数。
如果我直接使用'countDistinct',它就可以工作:

df.groupBy("id").agg(countDistinct('hours'))

输出[1]: Dataframe [id:int,计数(小时):二进制数]
这不起作用:

df.groupBy("id").agg({'hours':'countDistinct'}).show()

分析异常:未定义的函数:“countdistinct”。此函数既不是注册的临时函数,也不是在数据库“default”中注册的永久函数。
有什么想法可以解决这个问题吗?

siv3szwd

siv3szwd1#

countDistinct似乎不是一个“内置聚合函数”。
将非重复计数的列直接传递给agg可以解决此问题:

cols = [countDistinct(x) for x in df.columns if x != 'id']

df.groupBy('id').agg(*cols).show()
vyu0f0g1

vyu0f0g12#

如果我们做一些像这样的事情会更灵活

from pyspark.sql.functions import countDistinct, sum

agg_expn = {'hours':countDistinct, 'somethingelse': sum}

temp = [agg_expn[col] for col in df.columns if agg_expn.get(col, None)]

df.groupby(['id']).agg(*temp)

这样就可以为不同的列使用不同的集合函数

相关问题