在pyspark中使用foreachpartition()函数时,如何知道哪个分区当前正在运行?

axkjgtzd  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(568)

我需要将分区保存到文本文件中,每个分区使用不同的名称。但是在代码段下面运行时,只有一个文件通过覆盖上一个分区来保存。

  1. def chunks(iterator):
  2. chunks.counter += 1
  3. l = (list(iterator))
  4. df = pd.DataFrame(l,index=None)
  5. df.to_csv(parent_path+"C"+str(chunks.counter+1)+".txt", header=None, index=None, sep=' ')
  6. chunks.counter=0
  7. sc.parallelize([1,2,3,4,5,6],num_partions).foreachPartition(chunks)

有没有什么方法可以让我知道pyspark中当前运行的是哪个分区?

nszi6y05

nszi6y051#

  1. def chunks(lst, n):
  2. # Yield successive n-sized chunks from the lst...
  3. for i in (range(0, len(lst), n)):
  4. yield i, lst[i:i + n]
  5. for (index, values) in chunks(range(0, 1e5), 1e3): # change this to int's as per your need otherwise it will give float error or will write range obj itself..
  6. with open(f"{parent_path}_C_{index}.txt", "w") as output:
  7. output.write(str(values)) # converting to str

甚至可以轻松地将其 Package 到joblib中;)在我看来,我们不需要Pypark。。

相关问题