我有下面的代码,允许每个Map器使用多个文件,如果文件大小小于一定的限制
static class MyMultiFileRecordReader extends org.apache.hadoop.mapreduce.RecordReader<Text, Text> {
private final org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader reader;
private final int index;
public MyMultiFileRecordReader(org.apache.hadoop.mapreduce.lib.input.CombineFileSplit split, org.apache.hadoop.mapreduce.TaskAttemptContext context, Integer index) throws IOException {
this.index = index;
this.reader = new org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader(context.getConfiguration());
}
@Override
public void initialize(InputSplit split, org.apache.hadoop.mapreduce.TaskAttemptContext context) throws IOException, InterruptedException {
org.apache.hadoop.mapreduce.lib.input.CombineFileSplit combineSplit = (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) split;
Path file = combineSplit.getPath(index);
long start = combineSplit.getOffset(index);
long length = combineSplit.getLength(index);
String[] hosts = combineSplit.getLocations();
org.apache.hadoop.mapreduce.lib.input.FileSplit fileSplit = new FileSplit(file, start, length, hosts);
reader.initialize(fileSplit, context);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return reader.nextKeyValue();
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return reader.getCurrentKey();
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return reader.getCurrentValue();
}
@Override
public float getProgress() throws IOException, InterruptedException {
return reader.getProgress();
}
@Override
public void close() throws IOException {
reader.close();
}
}
public MyMultiFileInputFormat() {
super();
}
@Override
public org.apache.hadoop.mapreduce.RecordReader<Text, Text> createRecordReader(InputSplit split, org.apache.hadoop.mapreduce.TaskAttemptContext context) throws IOException {
return new CombineFileRecordReader<Text, Text>((CombineFileSplit) split, context, MyMultiFileRecordReader.class);
}
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
但是现在我想跳过文件,而不是在文件不存在时出现错误(s3位置不是我控制的,如果我在main中有只获取存在的文件的逻辑,甚至会遇到问题)
FileInputFormat.setInputPaths(job, inputPaths);
即使运行了上述代码,文件也可能消失
有办法吗?
我可以重写吗 InputFormat
?
暂无答案!
目前还没有任何答案,快来回答吧!