如何合并spark结果文件而不重新分区和copymerge?

n3h0vuf2  于 2021-05-30  发布在  Hadoop
关注(0)|答案(3)|浏览(384)

我使用下一个代码:

csv.saveAsTextFile(pathToResults, classOf[GzipCodec])

pathtoresults目录有许多文件,如part-0000、part-0001等。我可以使用fileutil.copymerge(),但它非常慢,需要在驱动程序上下载所有文件,然后在hadoop中上载它们。但是fileutil.copymerge()比:

csv.repartition(1).saveAsTextFile(pathToResults, classOf[GzipCodec])

如果不重新分区和fileutil.copymerge(),如何合并spark结果文件?

sg24os4d

sg24os4d1#

有着完全相同的问题,必须编写实现copymerge的pyspark代码(调用hadoop api):
https://github.com/tagar/stuff/blob/master/copymerge.py
不幸的是,copymerge作为一个独立的hadoopapi调用将在hadoop3.0中被弃用和删除。所以这个实现不依赖于hadoop的copymerge(它重新实现了它)。

bcs8qyzn

bcs8qyzn2#

coalesce(1)工作正常。我还看到了hadoop流选项,如果您想运行以下脚本,它可以动态合并hdfs文件:

$ hadoop jar /usr/hdp/2.3.2.0-2950/hadoop-mapreduce/hadoop-streaming-2.7.1.2.3.2.0-2950.jar \
                   -Dmapred.reduce.tasks=1 \
                   -input "/hdfs/input/dir" \
                   -output "/hdfs/output/dir" \
                   -mapper cat \
                   -reducer cat
svmlkihl

svmlkihl3#

不幸的是,在spark中没有其他方法可以获得单个输出文件。而不是 repartition(1) 你可以用 coalesce(1) ,但带有参数 1 他们的行为是一样的。spark会将数据收集到内存中的单个分区中,如果数据太大,可能会导致oom错误。
在hdfs上合并文件的另一个选择可能是编写一个简单的mapreduce作业(或pig作业,或hadoop流作业),将整个目录作为输入,并使用单个reducer生成单个输出文件。但请注意,使用mapreduce方法时,所有数据都将首先复制到reducer本地文件系统,这可能会导致“空间不足”错误。
以下是一些关于同一主题的有用链接:
在reduce阶段后合并输出文件
合并hdfs文件
在hadoop中将多个文件合并为一个文件

相关问题