我使用pyspark将Dataframe保存为Parquet文件或csv文件,如下所示:
def write_df_as_parquet_file(df, path, mode="overwrite"):
df = df.repartition(1) # join partitions to produce 1 parquet file
dfw = df.write.format("parquet").mode(mode)
dfw.save(path)
def write_df_as_csv_file(df, path, mode="overwrite", header=True):
df = df.repartition(1) # join partitions to produce 1 csv file
header = "true" if header else "false"
dfw = df.write.format("csv").option("header", header).mode(mode)
dfw.save(path)
但这会将parquet/csv文件保存在一个名为 path
,以这种方式保存一些我们不需要的其他文件:
图片:https://ibb.co/9c1d8rl
基本上,我想创建一些函数,使用上述方法将文件保存到一个位置,然后将csv或parquet文件移动到一个新位置。比如:
def write_df_as_parquet_file(df, path, mode="overwrite"):
# save df in one file inside tmp_folder
df = df.repartition(1) # join partitions to produce 1 parquet file
dfw = df.write.format("parquet").mode(mode)
tmp_folder = path + "TEMP"
dfw.save(tmp_folder)
# move parquet file from tmp_folder to path
copy_file(tmp_folder + "*.parquet", path)
remove_folder(tmp_folder)
我该怎么做?如何实施 copy_file
或者 remove_folder
? 我在scala中看到了一些解决方案,它们使用hadoopapi来实现这一点,但是我无法在python中实现这一点。我想我需要使用sparkcontext,但我仍然在学习hadoop,还没有找到方法。
1条答案
按热度按时间vcirk6k61#
您可以使用python的hdfs库之一连接到hdfs示例,然后执行所需的任何操作。
来自hdfs3文档(https://hdfs3.readthedocs.io/en/latest/quickstart.html):
将上面的内容 Package 成一个函数,就可以开始了。
注意:我刚刚以hdfs3为例。也可以使用hdfscli。