我有一个rdd类型 RDD[(k:Int,v:String)]
. 我想每个键最多有1000个元组 k
所以我有 [(k,v)]
没有键出现超过1000次。有没有一种方法可以避免先调用groupby的性能损失?我想不出一个好的方法来聚合这些值,以避免一个完整的groupby导致我的工作失败。
天真的方法:
def takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
rdd.groupBy(_._1).mapValues(_.take(n)).flatMap(_._2)
}
我正在寻找一种更有效的方法来避免分组:
takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
//use reduceByKey, foldByKey, etc..??
}
这是我迄今为止开发的最好的解决方案,但它不进行类型检查。。
def takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
rdd.foldByKey(List[V](), ((acc, elem) => if (acc.length >= n) acc else elem._2 :: acc)).flatMap(t => t._2.map(v => (t._1, v)))
}
编辑。我已经想出了一个稍微好一点的解决方案,看起来很管用:
takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
rdd.mapValues(List(_))
.reduceByKey((x,y) => if(x.length >= n) x
else if(y.length >= n) y
else (x ++ y).take(n))
.flatMap(t => t._2.map(v => (t._1, v)))
}
2条答案
按热度按时间50pmv0ei1#
您当前的解决方案是朝着正确方向迈出的一步,但它仍然非常低效,原因至少有三个:
mapValues(List(_))
创建大量临时List
物体length
对于线性Seq
就像List
是o(n)x ++ y
再次创建大量临时对象最简单的方法是替换
List
具有可变缓冲区和恒定时间length
. 一个可能的例子如下:在旁注上,您可以替换:
具有
它不会影响性能,但稍微干净一点。
c86crjj02#
这是迄今为止我想出的最好的解决办法
它不会像groupbykey方法那样耗尽内存并死掉,但仍然很慢。