spark过滤器和计数大rdd多次

ki1q1bka  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(479)

假设我有一个rdd[(string,int)],如下例所示:

(A, 0)
(B, 0)
(C, 1)
(D, 0)
(E, 2)
(F, 1)
(G, 1)
(H, 3)
(I, 2)
(J, 0)
(K, 3)

我想有效地打印包含0、1、2等的记录的总量。由于rdd包含数百万个条目,所以我希望尽可能高效地打印。
此示例的输出将返回如下内容:

Number of records containing 0 = 4
Number of records containing 1 = 3
Number of records containing 2 = 2
Number of records containing 3 = 2

目前,我尝试在大型rdd上执行一个过滤器,然后 count() 对于0,1,2,。。分开。我用的是scala。
有没有更有效的方法?我已经缓存了rdd,但我的程序仍然内存不足(我已经将驱动程序内存设置为5g)。
编辑:根据tzach的建议,我现在使用 countByKey :

rdd.map(_.swap).countByKey()

我是否可以通过将字符串值更改为一个元组(其中第二个元素是“m”或“f”)来细化这个值,然后获得这个元组的第二个元素的每个唯一值的每个键的计数?
例如:

(A,m), 0)
(B,f), 0)
(C,m), 1)
(D,m), 0)
(E,f), 2)
(F,f), 1)
(G,m), 1)
(H,m), 3)
(I,f), 2)
(J,f), 0)
(K,m), 3)

会导致

((0,m), 2)
((0,f), 2)
((1,m), 2)
((1,f), 1)
((2,m), 0)
((2,f), 2)
((3,m), 2)
((3,f), 0)

提前谢谢!

cidc1ykv

cidc1ykv1#

你可以使用方便的 countByKey 为此-只需事先交换输入中的位置,使数值成为键:

val rdd = sc.parallelize(Seq(
  ("A", 0), ("B", 0), ("C", 1), ("D", 0), ("E", 2),
  ("F", 1), ("G", 1), ("H", 3), ("I", 2), ("J", 0), ("K", 3)
))

rdd.map(_.swap).countByKey().foreach(println)
// (0,4)
// (1,3)
// (3,2)
// (2,2)

编辑: countByKey 完全按照它听起来的样子-所以不管你想用什么键,只要把你的rdd转换成元组的左边部分,例如:

rdd.map { case ((a, b), i) => ((i, b), a) }.countByKey()

或:

rdd.keyBy { case ((_, b), i) => (i, b) }.countByKey()

相关问题