我使用下一个代码:
csv.saveAsTextFile(pathToResults, classOf[GzipCodec])
pathtoresults目录有许多文件,如part-0000、part-0001等。我可以使用fileutil.copymerge(),但它非常慢,需要在驱动程序上下载所有文件,然后在hadoop中上载它们。但是fileutil.copymerge()比:
csv.repartition(1).saveAsTextFile(pathToResults, classOf[GzipCodec])
如果不重新分区和fileutil.copymerge(),如何合并spark结果文件?
3条答案
按热度按时间sg24os4d1#
有着完全相同的问题,必须编写实现copymerge的pyspark代码(调用hadoop api):
https://github.com/tagar/stuff/blob/master/copymerge.py
不幸的是,copymerge作为一个独立的hadoopapi调用将在hadoop3.0中被弃用和删除。所以这个实现不依赖于hadoop的copymerge(它重新实现了它)。
bcs8qyzn2#
coalesce(1)工作正常。我还看到了hadoop流选项,如果您想运行以下脚本,它可以动态合并hdfs文件:
svmlkihl3#
不幸的是,在spark中没有其他方法可以获得单个输出文件。而不是
repartition(1)
你可以用coalesce(1)
,但带有参数1
他们的行为是一样的。spark会将数据收集到内存中的单个分区中,如果数据太大,可能会导致oom错误。在hdfs上合并文件的另一个选择可能是编写一个简单的mapreduce作业(或pig作业,或hadoop流作业),将整个目录作为输入,并使用单个reducer生成单个输出文件。但请注意,使用mapreduce方法时,所有数据都将首先复制到reducer本地文件系统,这可能会导致“空间不足”错误。
以下是一些关于同一主题的有用链接:
在reduce阶段后合并输出文件
合并hdfs文件
在hadoop中将多个文件合并为一个文件