我有如下数据
-----------------------------
place | key | weights
----------------------------
amazon | lion | [ 34, 23, 56 ]
north | bear | [ 90, 45]
amazon | lion | [ 38, 30, 50 ]
amazon | bear | [ 45 ]
amazon | bear | [ 40 ]
我试图得到如下结果
-----------------------------
place | key | average
----------------------------
amazon | lion1 | 36.0 #(34 + 38)/2
amazon | lion2 | 26.5 #(23 + 30)/2
amazon | lion3 | 53.0 #(50 + 56)/2
north | bear1 | 90 #(90)/1
north | bear2 | 45 #(45)/1
amazon | bear1 | 42.5 #(45 + 40)/2
我明白了,首先我要做一个关于列的groupby place
以及 key
,然后我必须根据索引对数组元素进行平均。例如,lion1是数组中的第一个索引元素 [ 34, 23, 56 ]
以及 [ 38, 30, 50 ]
.
我已经有了一个使用 posexplode
,但问题出在真实数据上 weights
数组列大小非常大,因为 posexplode
增加了更多的行,数据量从1000万行大幅增加到12亿行,在当前集群上无法在可靠的时间内进行计算。
我认为最好是添加更多的列而不是行,然后取消对列的分割,但是我不知道如何使用pyspark或sparksql2.2.1实现这一点。
2条答案
按热度按时间kmpatx3s1#
一种选择是合并所有
array
对于给定的位置,键组合成一个数组。在这个数组上,可以使用udf
计算期望的平均值,最后posexplode
以得到期望的结果。qyswt5oh2#
可以通过functions.size()查找数组列中的最大元素数,然后展开该列:
设置数据
在数组字段“average”中查找最大元素数
将数组列转换为n列
计算新列上的平均聚合
使用select+union+reduce取消拆分列