我在hdfs中有1000多个文件可用,命名约定为 1_fileName.txt 至 N_fileName.txt . 每个文件的大小为1024MB。我需要将这些文件合并到一个(hdfs),保持文件的顺序。说 5_FileName.txt 只应在后面追加 4_fileName.txt 执行此操作的最佳和最快方法是什么。是否有任何方法可以在不复制数据节点之间的实际数据的情况下执行此合并?例如:获取此文件的块位置,并在namenode中使用这些块位置创建一个新条目(文件名)?
1_fileName.txt
N_fileName.txt
5_FileName.txt
4_fileName.txt
sczxawaw1#
有一个api方法org.apache.hadoop.fs.fileutil.copymerge执行此操作:
public static boolean copyMerge( FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource, Configuration conf, String addString)
它读取所有文件 srcDir 按字母顺序排列并将其内容附加到文件中。
srcDir
j8ag8udp2#
我为pyspark编写了一个实现,因为我们经常使用它。模仿hadoop的 copyMerge() 并使用相同的低级hadoop api来实现这一点。https://github.com/tagar/abalon/blob/v2.3.3/abalon/spark/sparkutils.py#l335它保持文件名的字母顺序。
copyMerge()
g0czyy6m3#
如果你能用Spark。可以这样做
sc.textFile("hdfs://...../part*).coalesce(1).saveAsTextFile("hdfs://...../filename)
希望这能起作用,因为spark是以分布式方式工作的,所以您不必将文件复制到一个节点中。尽管这只是一个警告,但如果文件非常大,在spark中合并文件可能会很慢。
new9mtju4#
由于文件顺序很重要,而词典顺序无法实现此目的,因此似乎可以为该任务编写Map程序,该程序可能会定期运行。当然没有reducer,将其作为hdfsMap任务编写是非常有效的,因为它可以将这些文件合并到一个输出文件中,而不需要跨数据节点进行太多的数据移动。由于源文件在hdfs中,而且Map器任务将尝试数据关联,因此它可以合并文件,而无需跨不同的数据节点移动文件。mapper程序将需要一个自定义inputsplit(在输入目录中获取文件名并根据需要排序)和一个自定义inputformat。Map器可以使用hdfs append,也可以使用原始输出流,在其中可以写入byte[]。我正在考虑的mapper程序的大致草图如下:
public class MergeOrderedFileMapper extends MapReduceBase implements Mapper<ArrayWritable, Text, ??, ??> { FileSystem fs; public void map(ArrayWritable sourceFiles, Text destFile, OutputCollector<??, ??> output, Reporter reporter) throws IOException { //Convert the destFile to Path. ... //make sure the parent directory of destFile is created first. FSDataOutputStream destOS = fs.append(destFilePath); //Convert the sourceFiles to Paths. List<Path> srcPaths; .... .... for(Path p: sourcePaths) { FSDataInputStream srcIS = fs.open(p); byte[] fileContent srcIS.read(fileContent); destOS.write(fileContent); srcIS.close(); reporter.progress(); // Important, else mapper taks may timeout. } destOS.close(); // Delete source files. for(Path p: sourcePaths) { fs.delete(p, false); reporter.progress(); } } }
n3ipq98p5#
没有有效的方法可以做到这一点,您需要将所有数据移到一个节点,然后再移回hdfs。执行此操作的命令行scriptlet可以如下所示:
hadoop fs -text *_fileName.txt | hadoop fs -put - targetFilename.txt
这会将所有与glob匹配的文件cat到标准输出,然后将该流通过管道传输到put命令,并将该流输出到名为targetfilename.txt的hdfs文件唯一的问题是你的文件名结构-如果你有固定的宽度,零填充数字部分,这将更容易,但在它的当前状态下,你会得到一个意外的lexigraphic顺序(1,10,100,1000,11,110等),而不是数字顺序(1,2,3,4等)。您可以通过将scriptlet修改为:
hadoop fs -text [0-9]_fileName.txt [0-9][0-9]_fileName.txt \ [0-9][0-9[0-9]_fileName.txt | hadoop fs -put - targetFilename.txt
5条答案
按热度按时间sczxawaw1#
有一个api方法org.apache.hadoop.fs.fileutil.copymerge执行此操作:
它读取所有文件
srcDir
按字母顺序排列并将其内容附加到文件中。j8ag8udp2#
我为pyspark编写了一个实现,因为我们经常使用它。
模仿hadoop的
copyMerge()
并使用相同的低级hadoop api来实现这一点。https://github.com/tagar/abalon/blob/v2.3.3/abalon/spark/sparkutils.py#l335
它保持文件名的字母顺序。
g0czyy6m3#
如果你能用Spark。可以这样做
希望这能起作用,因为spark是以分布式方式工作的,所以您不必将文件复制到一个节点中。尽管这只是一个警告,但如果文件非常大,在spark中合并文件可能会很慢。
new9mtju4#
由于文件顺序很重要,而词典顺序无法实现此目的,因此似乎可以为该任务编写Map程序,该程序可能会定期运行。当然没有reducer,将其作为hdfsMap任务编写是非常有效的,因为它可以将这些文件合并到一个输出文件中,而不需要跨数据节点进行太多的数据移动。由于源文件在hdfs中,而且Map器任务将尝试数据关联,因此它可以合并文件,而无需跨不同的数据节点移动文件。
mapper程序将需要一个自定义inputsplit(在输入目录中获取文件名并根据需要排序)和一个自定义inputformat。
Map器可以使用hdfs append,也可以使用原始输出流,在其中可以写入byte[]。
我正在考虑的mapper程序的大致草图如下:
n3ipq98p5#
没有有效的方法可以做到这一点,您需要将所有数据移到一个节点,然后再移回hdfs。
执行此操作的命令行scriptlet可以如下所示:
这会将所有与glob匹配的文件cat到标准输出,然后将该流通过管道传输到put命令,并将该流输出到名为targetfilename.txt的hdfs文件
唯一的问题是你的文件名结构-如果你有固定的宽度,零填充数字部分,这将更容易,但在它的当前状态下,你会得到一个意外的lexigraphic顺序(1,10,100,1000,11,110等),而不是数字顺序(1,2,3,4等)。您可以通过将scriptlet修改为: