如何在hadoop中将多个序列文件合并成一个序列文件。
smtd7mpg1#
不能对序列文件使用hadoop getmerge,因为它会将它们合并为二进制文件,而不是序列文件(因此合并后的文件中会有很多头文件…)。因此,您可以像@donald miner建议的那样,使用单个reducer编写一个小型hadoop作业,或者使用 SequenceFile.Reader 以及 SeuquenceFile.Writer .我选择了第二个选项,下面是我的代码:
SequenceFile.Reader
SeuquenceFile.Writer
package ru.mail.go.webbase.markov.hadoop.utils; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; public class SequenceFilesUtils { private static final Configuration conf = HBaseConfiguration.create(); public static <K, V> void merge(Path fromDirectory, Path toFile, Class<K> keyClass, Class<V> valueClass) throws IOException { FileSystem fs = FileSystem.get(conf); if (!fs.isDirectory(fromDirectory)) { throw new IllegalArgumentException("'" + fromDirectory.toString() + "' is not a directory"); } SequenceFile.Writer writer = SequenceFile.createWriter( conf, SequenceFile.Writer.file(toFile), SequenceFile.Writer.keyClass(keyClass), SequenceFile.Writer.valueClass(valueClass) ); for (FileStatus status : fs.listStatus(fromDirectory)) { if (status.isDirectory()) { System.out.println("Skip directory '" + status.getPath().getName() + "'"); continue; } Path file = status.getPath(); if (file.getName().startsWith("_")) { System.out.println("Skip \"_\"-file '" + file.getName() + "'"); //There are files such "_SUCCESS"-named in jobs' ouput folders continue; } System.out.println("Merging '" + file.getName() + "'"); SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(file)); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); while (reader.next(key, value)) { writer.append(key, value); } reader.close(); } writer.close(); } }
这是我的测试:
public class SequenceFilesUtilsTest { private static final String OUT_PATH = "./UNIVERSE/SequenceFilesUtilsTest/"; @Before public void initEnviroment() throws IOException { TestUtils.createDirectory(OUT_PATH); TestUtils.createDirectory(OUT_PATH + "/in"); } @Test public void test() throws Exception { Configuration conf = HBaseConfiguration.create(); Path inPath1 = new Path("file://" + new File(OUT_PATH).getAbsolutePath() + "/in/in1.seq"); System.out.println("Saving first part to '" + inPath1 + "'"); SequenceFile.Writer writer1 = SequenceFile.createWriter( conf, SequenceFile.Writer.file(inPath1), SequenceFile.Writer.keyClass(LongWritable.class), SequenceFile.Writer.valueClass(Text.class) ); writer1.append(new LongWritable(101), new Text("FIRST1")); writer1.append(new LongWritable(102), new Text("FIRST2")); writer1.append(new LongWritable(103), new Text("FIRST3")); writer1.append(new LongWritable(104), new Text("FIRST4")); writer1.close(); Path inPath2 = new Path("file://" + new File(OUT_PATH).getAbsolutePath() + "/in/in2.seq"); System.out.println("Saving second part to '" + inPath2 + "'"); SequenceFile.Writer writer2 = SequenceFile.createWriter( conf, SequenceFile.Writer.file(inPath2), SequenceFile.Writer.keyClass(LongWritable.class), SequenceFile.Writer.valueClass(Text.class) ); writer2.append(new LongWritable(201), new Text("SND1")); writer2.append(new LongWritable(202), new Text("SND2")); writer2.append(new LongWritable(203), new Text("SND3")); writer2.close(); SequenceFilesUtils.merge( new Path("file://" + new File(OUT_PATH).getAbsolutePath() + "/in"), new Path("file://" + new File(OUT_PATH).getAbsolutePath() + "/merged.seq"), LongWritable.class, Text.class); Path mergedPath = new Path("file://" + new File(OUT_PATH).getAbsolutePath() + "/merged.seq"); SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(mergedPath)); LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf); reader.next(key, value); Assert.assertEquals(101, key.get()); Assert.assertEquals("FIRST1", value.toString()); reader.next(key, value); Assert.assertEquals(102, key.get()); Assert.assertEquals("FIRST2", value.toString()); reader.next(key, value); Assert.assertEquals(103, key.get()); Assert.assertEquals("FIRST3", value.toString()); reader.next(key, value); Assert.assertEquals(104, key.get()); Assert.assertEquals("FIRST4", value.toString()); reader.next(key, value); Assert.assertEquals(201, key.get()); Assert.assertEquals("SND1", value.toString()); reader.next(key, value); Assert.assertEquals(202, key.get()); Assert.assertEquals("SND2", value.toString()); reader.next(key, value); Assert.assertEquals(203, key.get()); Assert.assertEquals("SND3", value.toString()); reader.close(); } }
093gszye2#
如果要处理大量的序列文件,我建议编写一个使用 Mapper 作为你的Map绘制者 Reducer 作为你的减速机。对于i/o格式,使用 SequenceFileInputFormat 以及 SequenceFileOutputFormat . 将减速器的数量设置为1。这些都是您在驱动程序/主代码的配置和作业对象中设置的内容。请参见如何设置输出格式、如何设置输入格式、如何设置Map器以及如何设置缩减器。请注意 Mapper 以及 Reducer 就是对数据不做任何处理--只是传递它。这就是为什么不在这里编写map函数或reduce函数。这将要做的是加载序列文件,对Map器中的数据不做任何处理,将所有记录洗牌到reducer,然后将它们全部输出到一个文件中。这样做的副作用是对输出序列文件中的键进行排序。
Mapper
Reducer
SequenceFileInputFormat
SequenceFileOutputFormat
mbjcgjjk3#
你考虑过搬家吗?我写它是为了处理某些sequencefile杂务,包括sequencefile合并。在您的情况下,您可以运行:
forqlift seq2seq --file new_combined_file.seq \ original_file1.seq original_file2.seq original_file3.seq ...
是的,福克利夫特 seq2seq 工具标记为“实验性”。。。但在我的内部测试中效果很好。
seq2seq
cgh8pdjw4#
如果要将多个文件合并到单个文件中,则有两个AN:
getmerge
用法: hadoop fs -getmerge <src> <localdst> 将源目录和目标文件作为输入,并将src中的文件连接到目标本地文件中。可以选择将addnl设置为允许在每个文件的末尾添加换行符。
hadoop fs -getmerge <src> <localdst>
org.apache.hadoop.fs.FileUtil.copyMerge(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource, Configuration conf, String addString);
将目录中的所有文件复制到一个输出文件(合并)
put
用法: hadoop dfs -put <localsrc> ... <dst> 将单个src或多个src从本地文件系统复制到目标文件系统。还从stdin读取输入并写入目标文件系统。
hadoop dfs -put <localsrc> ... <dst>
copyFromLocal
用法: hadoop dfs -copyFromLocal <localsrc> URI 与put命令类似,只是源代码仅限于本地文件引用。
hadoop dfs -copyFromLocal <localsrc> URI
4条答案
按热度按时间smtd7mpg1#
不能对序列文件使用hadoop getmerge,因为它会将它们合并为二进制文件,而不是序列文件(因此合并后的文件中会有很多头文件…)。
因此,您可以像@donald miner建议的那样,使用单个reducer编写一个小型hadoop作业,或者使用
SequenceFile.Reader
以及SeuquenceFile.Writer
.我选择了第二个选项,下面是我的代码:
这是我的测试:
093gszye2#
如果要处理大量的序列文件,我建议编写一个使用
Mapper
作为你的Map绘制者Reducer
作为你的减速机。对于i/o格式,使用SequenceFileInputFormat
以及SequenceFileOutputFormat
. 将减速器的数量设置为1。这些都是您在驱动程序/主代码的配置和作业对象中设置的内容。请参见如何设置输出格式、如何设置输入格式、如何设置Map器以及如何设置缩减器。请注意
Mapper
以及Reducer
就是对数据不做任何处理--只是传递它。这就是为什么不在这里编写map函数或reduce函数。这将要做的是加载序列文件,对Map器中的数据不做任何处理,将所有记录洗牌到reducer,然后将它们全部输出到一个文件中。这样做的副作用是对输出序列文件中的键进行排序。
mbjcgjjk3#
你考虑过搬家吗?我写它是为了处理某些sequencefile杂务,包括sequencefile合并。
在您的情况下,您可以运行:
是的,福克利夫特
seq2seq
工具标记为“实验性”。。。但在我的内部测试中效果很好。cgh8pdjw4#
如果要将多个文件合并到单个文件中,则有两个AN:
母语
用法:
hadoop fs -getmerge <src> <localdst>
将源目录和目标文件作为输入,并将src中的文件连接到目标本地文件中。可以选择将addnl设置为允许在每个文件的末尾添加换行符。java应用编程接口
将目录中的所有文件复制到一个输出文件(合并)
复制到hdfs
用法:
hadoop dfs -put <localsrc> ... <dst>
将单个src或多个src从本地文件系统复制到目标文件系统。还从stdin读取输入并写入目标文件系统。用法:
hadoop dfs -copyFromLocal <localsrc> URI
与put命令类似,只是源代码仅限于本地文件引用。