如何在Spark RDD中从特定分区获取数据?

7gcisfzg  于 2023-01-31  发布在  Apache
关注(0)|答案(2)|浏览(231)

我想从Spark RDD中的一个特定分区访问数据。我可以按如下方式获得分区地址:

myRDD.partitions(0)

但是我想从myRDD.partitions(0)分区获取数据。我试过官方的org.apache.spark文档,但是找不到。
先谢了。

zsbz8rwp

zsbz8rwp1#

可以按如下方式使用mapPartitionsWithIndex

// Create (1, 1), (2, 2), ..., (100, 100) dataset
// and partition by key so we know what to expect
val rdd = sc.parallelize((1 to 100) map (i => (i, i)), 16)
  .partitionBy(new org.apache.spark.HashPartitioner(8))

val zeroth = rdd
  // If partition number is not zero ignore data
  .mapPartitionsWithIndex((idx, iter) => if (idx == 0) iter else Iterator())

// Check if we get expected results 8, 16, ..., 96
assert (zeroth.keys.map(_ % 8 == 0).reduce(_ & _) & zeroth.count == 12)
e4yzc0pl

e4yzc0pl2#

最简单的方法是使用glom()函数,该函数遍历每个分区并将所有元素放入数组中,然后返回每个分区的元素数组的新RDD,其中每个数组是单独的分区。
假设我们有RDD,数据分布在5个分区中:

val rdd = sc.parallelize(1 to 20, 5)

执行rdd.glom.collect将打印:

Array[Array[Int]] = Array(
   Array(1, 2, 3, 4), 
   Array(5, 6, 7, 8),
   Array(9, 10, 11, 12), 
   Array(13, 14, 15, 16),
   Array(17, 18, 19, 20)
)

其中每个内部数组的位置是它的分区号。例如Array(1, 2, 3, 4)属于第零个分区,Array(5, 6, 7, 8)属于第一个分区,等等。

相关问题