pyspark:如何将Dataframe分割成块并保存它们?

ndasle7k  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(1560)

我要分一杯羹 pyspark Dataframe df 保存不同的块。
这就是我要做的:我定义一个列 id_tmp 我根据这个来分割Dataframe。

chunk = 10000
  id1 = 0
  id2 = chunk
  df = df.withColumn('id_tmp', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)
  c = df.count()
  while id1 < c:
    stop_df = df.filter( (tmp.id_tmp < id2) & (tmp.id_tmp >= id1))
    stop_df.write.format('com.databricks.spark.csv').save('myFolder/')
    id1+=chunk
    id2+=chunk

有没有更有效的方法不用定义列 id_tmp

epfja78i

epfja78i1#

我建议你使用 partitionBy 方法从 DataFrameWriter 接口内置spark(docs)。下面是一个例子。
鉴于 df 在dataframe中,chuck标识符必须是一列或多列。在我的例子中 tmp_id . 下面的代码段生成一个df,其中包含12条记录和4个块id。

import pyspark.sql.functions as F
df = spark.range(0, 12).withColumn("id_tmp", F.col("id") % 4).orderBy("id_tmp")
df.show()

退货:

+---+------+
| id|id_tmp|
+---+------+
|  8|     0|
|  0|     0|
|  4|     0|
|  1|     1|
|  9|     1|
|  5|     1|
|  6|     2|
|  2|     2|
| 10|     2|
|  3|     3|
| 11|     3|
|  7|     3|
+---+------+

要独立地保存每个块,您需要:

(df
 .repartition("id_tmp")
 .write
 .partitionBy("id_tmp")
 .mode("overwrite")
 .format("csv")
 .save("output_folder"))
``` `repartition` 将洗牌记录,使每个节点有一个完整的一组记录的一个“id\u tmp”值。然后,每个块都用 `partitionBy` .
生成的文件夹结构:

output_folder/
output_folder/._SUCCESS.crc
output_folder/id_tmp=0
output_folder/id_tmp=0/.part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv.crc
output_folder/id_tmp=0/part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv
output_folder/id_tmp=1
output_folder/id_tmp=1/.part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv.crc
output_folder/id_tmp=1/part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv
output_folder/id_tmp=2
output_folder/id_tmp=2/.part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv.crc
output_folder/id_tmp=2/part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv
output_folder/id_tmp=3
output_folder/id_tmp=3/.part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv.crc
output_folder/id_tmp=3/part-00000-eba244a4-ce95-4f4d-b9b8-8e5f972b144f.c000.csv
output_folder/_SUCCESS

分区的大小和数量对于spark的性能非常重要。不要对数据集进行过多的分区,要有合理的文件大小(比如每个文件1gb),特别是在使用云存储服务的情况下。如果要在加载时过滤数据(即:year=yyyy/month=mm/day=dd),也建议使用分区变量

相关问题