// Create (1, 1), (2, 2), ..., (100, 100) dataset
// and partition by key so we know what to expect
val rdd = sc.parallelize((1 to 100) map (i => (i, i)), 16)
.partitionBy(new org.apache.spark.HashPartitioner(8))
val zeroth = rdd
// If partition number is not zero ignore data
.mapPartitionsWithIndex((idx, iter) => if (idx == 0) iter else Iterator())
// Check if we get expected results 8, 16, ..., 96
assert (zeroth.keys.map(_ % 8 == 0).reduce(_ & _) & zeroth.count == 12)
2条答案
按热度按时间zsbz8rwp1#
可以按如下方式使用
mapPartitionsWithIndex
e4yzc0pl2#
最简单的方法是使用
glom()
函数,该函数遍历每个分区并将所有元素放入数组中,然后返回每个分区的元素数组的新RDD,其中每个数组是单独的分区。假设我们有RDD,数据分布在5个分区中:
执行
rdd.glom.collect
将打印:其中每个内部数组的位置是它的分区号。例如
Array(1, 2, 3, 4)
属于第零个分区,Array(5, 6, 7, 8)
属于第一个分区,等等。