如何通过pyspark将csv文件写入一个文件

mpgws1up  于 2023-03-01  发布在  Spark
关注(0)|答案(4)|浏览(194)

我用this方法写csv文件,但是会生成一个有多个part文件的文件,这不是我想要的;我需要它在一个文件中,而且我还发现another post使用scala强制所有东西在一个分区上计算,然后得到一个文件。

第一个问题:如何在Python中实现这一点?

在第二篇文章中,还提到Hadoop function可以将多个文件合并为一个文件。

第二个问题:是否可以在Spark中合并两个文件?

eqzww0vc

eqzww0vc1#

你可以用,

df.coalesce(1).write.csv('result.csv')

**注意:**使用合并函数时,将丢失并行性。

jpfvwuh4

jpfvwuh42#

你可以使用下面的cat命令行函数来完成这个操作。这会将所有的部分文件连接成1个csv。不需要重新分区成1个分区。

import os
test.write.csv('output/test')
os.system("cat output/test/p* > output/test.csv")
qlckcl4x

qlckcl4x3#

要求是通过将RDD带到一个执行程序来将RDD保存在一个CSV文件中。这意味着跨执行程序的RDD分区将被 Shuffle 到一个执行程序。我们可以使用coalesce(1)repartition(1)来实现此目的。除此之外,还可以向生成的csv文件添加列标题。首先,我们可以保留一个实用函数,以使数据与csv兼容。

def toCSVLine(data):
    return ','.join(str(d) for d in data)

假设MyRDD有五列,需要'ID','DT_KEY','Grade','Score','TRF_Age'作为列标题,所以我创建了一个标题RDD和如下所示的联合MyRDD,大多数情况下标题都在csv文件的顶部。

unionHeaderRDD = sc.parallelize( [( 'ID','DT_KEY','Grade','Score','TRF_Age' )])\
   .union( MyRDD )

unionHeaderRDD.coalesce( 1 ).map( toCSVLine ).saveAsTextFile("MyFileLocation" )

可以使用saveAsPickleFile spark context API方法来序列化为了节省空间而保存的数据。使用pickFile来读取pickle文件。

vuktfyat

vuktfyat4#

我需要我的csv输出在一个单独的文件中,头文件保存到一个s3桶中,文件名是我提供的,当我运行它(在数据库集群上运行spark 3.3.1)时,当前接受的答案给了我一个文件夹,文件名是我想要的,里面有一个csv文件(由于coalesce(1)),文件名是随机的,没有头文件。
我发现把它作为中间步骤发送给Pandas只提供了一个带有头的文件,完全符合预期。

my_spark_df.toPandas().to_csv('s3_csv_path.csv',index=False)

相关问题