我正在努力学习python中的spark,我被困在 combineByKey
用于平均键值对中的值。事实上,我的困惑不在于 combineByKey
语法,但后面会发生什么。典型的例子(来自o'rielly 2015 learning spark book)可以在许多地方的网上看到;这里有一个。
问题在于 sumCount.map(lambda (key, (totalSum, count)): (key, totalSum / count)).collectAsMap()
声明。使用spark2.0.1和ipython3.5.2,会引发语法错误异常。把它简化成应该有用的东西(这是o'reilly书中的内容): sumCount.map(lambda key,vals: (key, vals[0]/vals[1])).collectAsMap()
导致spark变得疯狂,java例外,但我注意到 TypeError: <lambda>() missing 1 required positional argument: 'v'
错误。
有人能给我举一个这个功能的例子,它实际上可以与spark&python的最新版本一起使用吗?为了完整起见,我将自己的最小工作(或者更确切地说,非工作)示例包括在内:
In: pRDD = sc.parallelize([("s",5),("g",3),("g",10),("c",2),("s",10),("s",3),("g",-1),("c",20),("c",2)])
In: cbk = pRDD.combineByKey(lambda x:(x,1), lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1]))
In: cbk.collect()
Out: [('s', (18, 3)), ('g', (12, 3)), ('c', (24, 3))]
In: cbk.map(lambda key,val:(k,val[0]/val[1])).collectAsMap() <-- errors
计算起来很容易 [(e[0],e[1][0]/e[1][1]) for e in cbk.collect()]
,但我宁愿用“斯巴克”的方式。
2条答案
按热度按时间qybjjes11#
一步一步地:
lambda (key, (totalSum, count)): ...
是所谓的元组参数解包,它在python中已被删除。RDD.map
接受一个期望为单参数的函数。您尝试使用的函数:是一个需要两个参数而不是一个参数的函数。2.x语法的有效翻译是
或:
你也可以用
mapValues
:最后,数值稳定的解决方案是:
uemypmqf2#
可以使用窗口概念对特定列值求平均值。考虑以下代码:
将产生:
平均聚合是在每个worker上分别完成的,不需要往返于主机,因此效率很高。