pyspark:无序rdd

mitkmikd  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(535)

我试图随机排列rdd中元素的顺序。我目前的方法是用无序整数的rdd压缩元素,然后用这些整数连接。
然而,pyspark只包含100000000个整数。我正在使用下面的代码。
我的问题是:有没有更好的方法来压缩随机索引或以其他方式洗牌?
我试过按随机键排序,这很有效,但速度很慢。

  1. def random_indices(n):
  2. """
  3. return an iterable of random indices in range(0,n)
  4. """
  5. indices = range(n)
  6. random.shuffle(indices)
  7. return indices

pyspark中会发生以下情况:

  1. Using Python version 2.7.3 (default, Jun 22 2015 19:33:41)
  2. SparkContext available as sc.
  3. >>> import clean
  4. >>> clean.sc = sc
  5. >>> clean.random_indices(100000000)
  6. Killed
rqcrx0a6

rqcrx0a61#

一种可能的方法是使用 mapParitions ```
import os
import numpy as np

swap = lambda x: (x[1], x[0])

def add_random_key(it):
# make sure we get a proper random seed
seed = int(os.urandom(4).encode('hex'), 16)
# create separate generator
rs = np.random.RandomState(seed)
# Could be randint if you prefer integers
return ((rs.rand(), swap(x)) for x in it)

rdd_with_keys = (rdd

It will be used as final key. If you don't accept gaps

use zipWithIndex but this should be cheaper

.zipWithUniqueId()
.mapPartitions(add_random_key, preservesPartitioning=True))

  1. 接下来可以重新分区、对每个分区排序并提取值:

n = rdd.getNumPartitions()
(rdd_with_keys
# partition by random key to put data on random partition
.partitionBy(n)
# Sort partition by random value to ensure random order on partition
.mapPartitions(sorted, preservesPartitioning=True)
# Extract (unique_id, value) pairs
.values())

  1. 如果每个分区的排序仍然很慢,可以用fisher-yates shuffle来代替。
  2. 如果您只是需要一个随机数据,那么您可以使用 `mllib.RandomRDDs` ```
  3. from pyspark.mllib.random import RandomRDDs
  4. RandomRDDs.uniformRDD(sc, n)

理论上它可以通过输入压缩 rdd 但它需要匹配每个分区的元素数。

展开查看全部
hfwmuf9z

hfwmuf9z2#

Pypark成功了!

  1. from random import randrange
  2. data_rnd = data.sortBy(lambda x: randrange(1000000))

相关问题