spark:scala-如何将集合从rdd转换为另一个rdd

afdcj2ne  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(415)

如何转换调用后返回的集合 take(5) 到另一个rdd以便我可以保存输出文件中的前5条记录?
如果我使用 saveAsTextfile 它不让我使用 take 以及 saveAsTextFile 一起(这就是为什么你看到下面的评论)。它按排序顺序存储rdd中的所有记录,因此前5个记录是前5个国家/地区,但我只想存储前5个记录-是否可以将集合[take(5)]转换为rdd?

val Strips =  txtFileLines.map(_.split(","))
                         .map(line => (line(0) + "," + (line(7).toInt + line(8).toInt)))
                         .sortBy(x => x.split(",")(1).trim().toInt, ascending=false)
                         .take(5)
                       //.saveAsTextFile("output\\country\\byStripsBar")

解决方案: sc.parallelize(Strips, 1).saveAsTextFile("output\\country\\byStripsBar")

xjreopfe

xjreopfe1#

除非你真的需要 saveAsTextFile 格式化,我只需要打印 take(5) 使用简单io输出到文件(如 File ).
否则,这就是罗嗦 RDD 唯一解决方案:

scala> val rdd = sc.parallelize(5 to 1 by -1 map{x => (x, x*x)})
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[71] at parallelize at <console>:27

scala> rdd.collect
res1: Array[(Int, Int)] = Array((5,25), (4,16), (3,9), (2,4), (1,1))

scala> val top2 = rdd.sortBy(_._1).zipWithIndex.collect{case x if (x._2 < 2) => x._1}
top2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[79] at collect at <console>:29

scala> top2.collect
res2: Array[(Int, Int)] = Array((1,1), (2,4))
9rbhqvlz

9rbhqvlz2#

val rowsArray: Array[Row] = rdd.take(5)
val slicedRdd = sparkContext.parallelize(rowsArray, 1)

slicedRdd.savesTextFile("specify path here")

相关问题