pyspark将两个Dataframe写入同一分区,但以文件夹分隔

5sxhfpxr  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(410)

我使用spark将两个不同的Dataframe写入同一个分区,但是我希望它们在分区的末尾被文件夹隔开。i、 e.第一个Dataframe将写入 yyyy/mm/dd/ 第二个会写信给 yyyy/mm/dd/rejected/ 目前,我可以将第一个Dataframe写入 yyyy/mm/dd/ 以及第二Dataframe rejected/yyyy/mm/dd 使用以下代码:

first_df.repartition('year', 'month', 'day').write \
    .partitionBy('year', 'month', 'day') \
    .mode("append") \
    .csv(f"{output_path}/")

  second_df.repartition('year', 'month', 'day').write \
    .partitionBy('year', 'month', 'day') \
    .mode("append") \
    .csv(f"{output_path}/rejected")

有什么建议吗

o2gm4chl

o2gm4chl1#

添加 rejected 作为文本值 second_df 然后包括在 partitionBy

second_df.withColumn("rej",lit("rejected")) \
    .repartition('year', 'month', 'day').write \
    .partitionBy('year', 'month', 'day','rej') \
    .mode("append") \
    .csv(f"{output_path}")

另一种方法是使用 hadoop file api 将文件移到受尊重的目录中。
Update: Rename the directory: ```
URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration

fs = FileSystem.get(URI("hdfs://<name_node>:8020"), Configuration())

rename the directory

fs.rename(Path(f'{output_path}/rej=rejected'),Path(f'{output_path}/rejected'))

相关问题