python spark combinebykey平均值

fbcarpbf  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(421)

我正在努力学习python中的spark,我被困在 combineByKey 用于平均键值对中的值。事实上,我的困惑不在于 combineByKey 语法,但后面会发生什么。典型的例子(来自o'rielly 2015 learning spark book)可以在许多地方的网上看到;这里有一个。
问题在于 sumCount.map(lambda (key, (totalSum, count)): (key, totalSum / count)).collectAsMap() 声明。使用spark2.0.1和ipython3.5.2,会引发语法错误异常。把它简化成应该有用的东西(这是o'reilly书中的内容): sumCount.map(lambda key,vals: (key, vals[0]/vals[1])).collectAsMap() 导致spark变得疯狂,java例外,但我注意到 TypeError: <lambda>() missing 1 required positional argument: 'v' 错误。
有人能给我举一个这个功能的例子,它实际上可以与spark&python的最新版本一起使用吗?为了完整起见,我将自己的最小工作(或者更确切地说,非工作)示例包括在内:

In: pRDD = sc.parallelize([("s",5),("g",3),("g",10),("c",2),("s",10),("s",3),("g",-1),("c",20),("c",2)])
In: cbk = pRDD.combineByKey(lambda x:(x,1), lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1]))
In: cbk.collect()
Out: [('s', (18, 3)), ('g', (12, 3)), ('c', (24, 3))]
In: cbk.map(lambda key,val:(k,val[0]/val[1])).collectAsMap() <-- errors

计算起来很容易 [(e[0],e[1][0]/e[1][1]) for e in cbk.collect()] ,但我宁愿用“斯巴克”的方式。

qybjjes1

qybjjes11#

一步一步地: lambda (key, (totalSum, count)): ... 是所谓的元组参数解包,它在python中已被删除。 RDD.map 接受一个期望为单参数的函数。您尝试使用的函数:

lambda key, vals: ...

是一个需要两个参数而不是一个参数的函数。2.x语法的有效翻译是

lambda key_vals: (key_vals[0], key_vals[1][0] / key_vals[1][1])

或:

def get_mean(key_vals):
    key, (total, cnt) = key_vals
    return key, total / cnt

cbk.map(get_mean)

你也可以用 mapValues :

cbk.mapValues(lambda x: x[0] / x[1])

最后,数值稳定的解决方案是:

from pyspark.statcounter import StatCounter

(pRDD
    .combineByKey(
        lambda x: StatCounter([x]),
        StatCounter.merge,
        StatCounter.mergeStats)
    .mapValues(StatCounter.mean))
uemypmqf

uemypmqf2#

可以使用窗口概念对特定列值求平均值。考虑以下代码:

import pyspark.sql.functions as F
from pyspark.sql import Window
df = spark.createDataFrame([('a', 2), ('b', 3), ('a', 6), ('b', 5)],
                           ['a', 'i'])
win = Window.partitionBy('a')
df.withColumn('avg', F.avg('i').over(win)).show()

将产生:

+---+---+---+
|  a|  i|avg|
+---+---+---+
|  b|  3|4.0|
|  b|  5|4.0|
|  a|  2|4.0|
|  a|  6|4.0|
+---+---+---+

平均聚合是在每个worker上分别完成的,不需要往返于主机,因此效率很高。

相关问题