如何从python复制pyspark/hadoop中的文件

ybzsozfc  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(622)

我使用pyspark将Dataframe保存为Parquet文件或csv文件,如下所示:

  1. def write_df_as_parquet_file(df, path, mode="overwrite"):
  2. df = df.repartition(1) # join partitions to produce 1 parquet file
  3. dfw = df.write.format("parquet").mode(mode)
  4. dfw.save(path)
  5. def write_df_as_csv_file(df, path, mode="overwrite", header=True):
  6. df = df.repartition(1) # join partitions to produce 1 csv file
  7. header = "true" if header else "false"
  8. dfw = df.write.format("csv").option("header", header).mode(mode)
  9. dfw.save(path)

但这会将parquet/csv文件保存在一个名为 path ,以这种方式保存一些我们不需要的其他文件:

图片:https://ibb.co/9c1d8rl
基本上,我想创建一些函数,使用上述方法将文件保存到一个位置,然后将csv或parquet文件移动到一个新位置。比如:

  1. def write_df_as_parquet_file(df, path, mode="overwrite"):
  2. # save df in one file inside tmp_folder
  3. df = df.repartition(1) # join partitions to produce 1 parquet file
  4. dfw = df.write.format("parquet").mode(mode)
  5. tmp_folder = path + "TEMP"
  6. dfw.save(tmp_folder)
  7. # move parquet file from tmp_folder to path
  8. copy_file(tmp_folder + "*.parquet", path)
  9. remove_folder(tmp_folder)

我该怎么做?如何实施 copy_file 或者 remove_folder ? 我在scala中看到了一些解决方案,它们使用hadoopapi来实现这一点,但是我无法在python中实现这一点。我想我需要使用sparkcontext,但我仍然在学习hadoop,还没有找到方法。

vcirk6k6

vcirk6k61#

您可以使用python的hdfs库之一连接到hdfs示例,然后执行所需的任何操作。
来自hdfs3文档(https://hdfs3.readthedocs.io/en/latest/quickstart.html):

  1. from hdfs3 import HDFileSystem
  2. hdfs = HDFileSystem(host=<host>, port=<port>)
  3. hdfs.mv(tmp_folder + "*.parquet", path)

将上面的内容 Package 成一个函数,就可以开始了。
注意:我刚刚以hdfs3为例。也可以使用hdfscli。

相关问题