如何在pyspark中基于索引找到数组列的平均值

ufj5ltwl  于 2021-06-24  发布在  Hive
关注(0)|答案(2)|浏览(310)

我有如下数据

-----------------------------
place  | key        | weights
----------------------------
amazon | lion       | [ 34, 23, 56 ]
north  | bear       | [ 90, 45]
amazon | lion       | [ 38, 30, 50 ]
amazon | bear       | [ 45 ]
amazon | bear       | [ 40 ]

我试图得到如下结果

-----------------------------
place  | key        | average
----------------------------
amazon | lion1      | 36.0      #(34 + 38)/2
amazon | lion2      | 26.5      #(23 + 30)/2
amazon | lion3      | 53.0      #(50 + 56)/2
north  | bear1      | 90        #(90)/1
north  | bear2      | 45        #(45)/1
amazon | bear1      | 42.5      #(45 + 40)/2

我明白了,首先我要做一个关于列的groupby place 以及 key ,然后我必须根据索引对数组元素进行平均。例如,lion1是数组中的第一个索引元素 [ 34, 23, 56 ] 以及 [ 38, 30, 50 ] .
我已经有了一个使用 posexplode ,但问题出在真实数据上 weights 数组列大小非常大,因为 posexplode 增加了更多的行,数据量从1000万行大幅增加到12亿行,在当前集群上无法在可靠的时间内进行计算。
我认为最好是添加更多的列而不是行,然后取消对列的分割,但是我不知道如何使用pyspark或sparksql2.2.1实现这一点。

kmpatx3s

kmpatx3s1#

一种选择是合并所有 array 对于给定的位置,键组合成一个数组。在这个数组上,可以使用 udf 计算期望的平均值,最后 posexplode 以得到期望的结果。

from pyspark.sql.functions import collect_list,udf,posexplode,concat
from pyspark.sql.types import ArrayType,DoubleType

# Grouping by place,key to get an array of arrays

grouped_df = df.groupBy(df.place,df.key).agg(collect_list(df.weights).alias('all_weights'))

# Define UDF

zip_mean = udf(lambda args: [sum(i)/len(i) for i in zip(*args)],ArrayType(DoubleType()))

# Apply UDF on the array of array column

res = grouped_df.select('*',zip_mean(grouped_df.all_weights).alias('average'))

# POS explode to explode the average values and get the position for key concatenation

res = res.select('*',posexplode(res.average))

# Final result

res.select(res.place,concat(res.key,res.pos+1).alias('key'),res.col).show()
qyswt5oh

qyswt5oh2#

可以通过functions.size()查找数组列中的最大元素数,然后展开该列:
设置数据

from pyspark.sql import functions as F

df = spark.createDataFrame([    
      ('amazon', 'lion', [ 34, 23, 56 ])
    , ('north',  'bear', [ 90, 45])
    , ('amazon', 'lion', [ 38, 30, 50 ])
    , ('amazon', 'bear', [ 45 ])    
    , ('amazon', 'bear', [ 40 ])
], ['place', 'key', 'average'])

在数组字段“average”中查找最大元素数

n = df.select(F.max(F.size('average')).alias('n')).first().n

>>> n
3

将数组列转换为n列

df1 = df.select('place', 'key', *[F.col('average')[i].alias('val_{}'.format(i+1)) for i in range(n)])

>>> df1.show()
+------+----+-----+-----+-----+
| place| key|val_1|val_2|val_3|
+------+----+-----+-----+-----+
|amazon|lion|   34|   23|   56|
| north|bear|   90|   45| null|
|amazon|lion|   38|   30|   50|
|amazon|bear|   45| null| null|
|amazon|bear|   40| null| null|
+------+----+-----+-----+-----+

计算新列上的平均聚合

df2 = df1.groupby('place', 'key').agg(*[ F.mean('val_{}'.format(i+1)).alias('average_{}'.format(i+1)) for i in range(n)])

>>> df2.show()
+------+----+---------+---------+---------+
| place| key|average_1|average_2|average_3|
+------+----+---------+---------+---------+
|amazon|bear|     42.5|     null|     null|
| north|bear|     90.0|     45.0|     null|
|amazon|lion|     36.0|     26.5|     53.0|
+------+----+---------+---------+---------+

使用select+union+reduce取消拆分列

from functools import reduce

df_new = reduce(lambda x,y: x.union(y), [
    df2.select('place', F.concat('key', F.lit(i+1)).alias('key'), F.col('average_{}'.format(i+1)).alias('average')) \
       .dropna(subset=['average']) for i in range(n)
])

>>> df_new.show()
+------+-----+-------+
| place|  key|average|
+------+-----+-------+
|amazon|bear1|   42.5|
| north|bear1|   90.0|
|amazon|lion1|   36.0|
| north|bear2|   45.0|
|amazon|lion2|   26.5|
|amazon|lion3|   53.0|
+------+-----+-------+

相关问题