scala—高效地实现spark的takebykey

n53p2ov0  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(410)

我有一个rdd类型 RDD[(k:Int,v:String)] . 我想每个键最多有1000个元组 k 所以我有 [(k,v)] 没有键出现超过1000次。有没有一种方法可以避免先调用groupby的性能损失?我想不出一个好的方法来聚合这些值,以避免一个完整的groupby导致我的工作失败。
天真的方法:

  1. def takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
  2. rdd.groupBy(_._1).mapValues(_.take(n)).flatMap(_._2)
  3. }

我正在寻找一种更有效的方法来避免分组:

  1. takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
  2. //use reduceByKey, foldByKey, etc..??
  3. }

这是我迄今为止开发的最好的解决方案,但它不进行类型检查。。

  1. def takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
  2. rdd.foldByKey(List[V](), ((acc, elem) => if (acc.length >= n) acc else elem._2 :: acc)).flatMap(t => t._2.map(v => (t._1, v)))
  3. }

编辑。我已经想出了一个稍微好一点的解决方案,看起来很管用:

  1. takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
  2. rdd.mapValues(List(_))
  3. .reduceByKey((x,y) => if(x.length >= n) x
  4. else if(y.length >= n) y
  5. else (x ++ y).take(n))
  6. .flatMap(t => t._2.map(v => (t._1, v)))
  7. }
50pmv0ei

50pmv0ei1#

您当前的解决方案是朝着正确方向迈出的一步,但它仍然非常低效,原因至少有三个: mapValues(List(_)) 创建大量临时 List 物体 length 对于线性 Seq 就像 List 是o(n) x ++ y 再次创建大量临时对象
最简单的方法是替换 List 具有可变缓冲区和恒定时间 length . 一个可能的例子如下:

  1. import scala.collection.mutable.ArrayBuffer
  2. rdd.aggregateByKey(ArrayBuffer[Int]())(
  3. (acc, x) => if (acc.length >= n) acc else acc += x,
  4. (acc1, acc2) => {
  5. val (xs, ys) = if (acc1.length > acc2.length) (acc1, acc2) else (acc2, acc1)
  6. val toTake = Math.min(n - xs.length, ys.length)
  7. for (i <- 0 until toTake) {
  8. xs += ys(i)
  9. }
  10. xs
  11. }
  12. )

在旁注上,您可以替换:

  1. .flatMap(t => t._2.map(v => (t._1, v)))

具有

  1. .flatMapValues(x => x) // identity[Seq[V]]

它不会影响性能,但稍微干净一点。

展开查看全部
c86crjj0

c86crjj02#

这是迄今为止我想出的最好的解决办法

  1. takeByKey(rdd: RDD[(K,V)], n: Int) : RDD[(K,V)] = {
  2. rdd.mapValues(List(_))
  3. .reduceByKey((x,y) => if(x.length >= n) x
  4. else if(y.length >= n) y
  5. else (x ++ y).take(n))
  6. .flatMap(t => t._2.map(v => (t._1, v)))
  7. }

它不会像groupbykey方法那样耗尽内存并死掉,但仍然很慢。

相关问题