如何基于两列订购spark rdd

k7fdbhmy  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(399)

我有以下rdd数据集:

ABC [G4, G3, G1]    3
FFF [G5, G4, G3]    3
CDE [G5,G4,G3,G2]   4
XYZ [G4, G3]    2

需要先按最后一列说明排序,如果最后一列相同,则按第一个元组项说明顺序排序。预期结果是

CDE [G5,G4,G3,G2]   4
FFF [G5, G4, G3]    3
ABC [G4, G3, G1]    3
XYZ [G4, G3]    2

提前谢谢。

ifmq2ha2

ifmq2ha21#

我不知道为什么上面的答案不起作用。我觉得很好。试试这个代码。
以下是我的意见:

i1,array1,10
i5,array2,50
i4,array3,20
i2,array4,20

代码:

val idRDD = sc.textFile(inputPath)

val idSOrted = idRDD.map { rec => ((rec.split(",")(2),rec.split(",")(0)),(rec.split(",")(1))) }.sortByKey(false).map(rec=>(rec._1._1,rec._2,rec._1._2))

这是订单号:

(50,array2,i5)
(20,array3,i4)
(20,array4,i2)
(10,array1,i1)
soat7uwm

soat7uwm2#

你可以用 sortBy :

rdd.sortBy(r => (r._3, r._2(0)), false)

在上面, r._3 代表最后一列, r._2(0) 对于第二列(数组)的第一个元素 false 指定顺序应为降序。但是请记住,由于洗牌,排序是一个昂贵的操作。
更新
如果我们假设你从一个 pair rdd :

/// Generate data
val rdd = sc.parallelize(Seq(("ABC","G4"),("ABC","G3"),
                             ("ABC","G1"),("FFF","G5"),
                             ("FFF","G4"),("FFF","G3"),
                             ("CDE","G5"),("CDE","G4"),                             
                             ("CDE","G3"),("CDE","G2"),
                             ("XYZ","G4"),("XYZ","G3")))

/// Put values in a list and calculate its size
val rdd_new = rdd.groupByKey.mapValues(_.toList).map(x => (x._1, x._2, x._2.size))

/// Now this works
rdd_new.sortBy(r => (r._3, r._2(0)), false).collect()
/// Array[(String, List[String], Int)] = Array((CDE,List(G5, G4, G3, G2),4), (FFF,List(G5, G4, G3),3), (ABC,List(G4, G3, G1),3), (XYZ,List(G4, G3),2))

相关问题