如何将pyspark Dataframe 保存在单个csv文件中

3phpmpom  于 2023-08-03  发布在  Spark
关注(0)|答案(2)|浏览(155)

这是how to save dataframe into csv pyspark线程的延续。
我正在尝试将我的pyspark Dataframe df保存到pyspark 3.0.1中。所以我写了

df.coalesce(1).write.csv('mypath/df.csv)

字符串
但是在执行这个命令后,我看到我的路径中有一个名为df.csv的文件夹,其中包含4个以下文件

1._committed_..
2._started_...
3._Success  
4. part-00000-.. .csv


你能建议我如何保存所有数据在df.csv

xa9qqrwz

xa9qqrwz1#

您可以使用.coalesce(1)将文件保存在一个csv分区中,然后重命名此csv并将其移动到所需的文件夹中。
下面是一个实现此功能的函数:
df:您的df
fileName:您要为csv文件指定的名称
filePath:您要保存的文件夹

def export_csv(df, fileName, filePath):
  
  filePathDestTemp = filePath + ".dir/" 

  df\
    .coalesce(1)\
    .write\
    .csv(filePathDestTemp) # use .csv to save as csv

  listFiles = dbutils.fs.ls(filePathDestTemp)
  for subFiles in listFiles:
    if subFiles.name[-4:] == ".csv":
      
      dbutils.fs.cp (filePathDestTemp + subFiles.name,  filePath + fileName+ '.csv')

  dbutils.fs.rm(filePathDestTemp, recurse=True)

字符串

bpzcxfmw

bpzcxfmw2#

如果你想得到一个名为df.csv的文件作为输出,你可以先写入一个临时文件夹,然后移动Spark生成的零件文件并重命名它。
这些步骤可以通过JVM网关使用Hadoop FileSystem API完成:

temp_path = "mypath/__temp"
target_path = "mypath/df.csv"

df.coalesce(1).write.mode("overwrite").csv(temp_path)

Path = sc._gateway.jvm.org.apache.hadoop.fs.Path

# get the part file generated by spark write
fs = Path(temp_path).getFileSystem(sc._jsc.hadoopConfiguration())
csv_part_file = fs.globStatus(Path(temp_path + "/part*"))[0].getPath()

# move and rename the file
fs.rename(csv_part_file, Path(target_path))
fs.delete(Path(temp_path), True)

字符串

相关问题