如果mapreduce hadoop中的s3中不存在文件,如何在使用多输入时跳过文件?

x33g5p2x  于 2021-07-15  发布在  Hadoop
关注(0)|答案(0)|浏览(210)

我有下面的代码,允许每个Map器使用多个文件,如果文件大小小于一定的限制

static class MyMultiFileRecordReader extends org.apache.hadoop.mapreduce.RecordReader<Text, Text> {

        private final org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader reader;
        private final int index;

        public MyMultiFileRecordReader(org.apache.hadoop.mapreduce.lib.input.CombineFileSplit split, org.apache.hadoop.mapreduce.TaskAttemptContext context, Integer index) throws IOException {
            this.index = index;
            this.reader = new org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader(context.getConfiguration());
        }

        @Override
        public void initialize(InputSplit split, org.apache.hadoop.mapreduce.TaskAttemptContext context) throws IOException, InterruptedException {
            org.apache.hadoop.mapreduce.lib.input.CombineFileSplit combineSplit = (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) split;
            Path file = combineSplit.getPath(index);
            long start = combineSplit.getOffset(index);
            long length = combineSplit.getLength(index);
            String[] hosts = combineSplit.getLocations();
            org.apache.hadoop.mapreduce.lib.input.FileSplit fileSplit = new FileSplit(file, start, length, hosts);
            reader.initialize(fileSplit, context);
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            return reader.nextKeyValue();
        }

        @Override
        public Text getCurrentKey() throws IOException, InterruptedException {
            return reader.getCurrentKey();
        }

        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return reader.getCurrentValue();
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            return reader.getProgress();
        }

        @Override
        public void close() throws IOException {
            reader.close();
        }

    }

    public MyMultiFileInputFormat() {
        super();
    }

    @Override
    public org.apache.hadoop.mapreduce.RecordReader<Text, Text> createRecordReader(InputSplit split, org.apache.hadoop.mapreduce.TaskAttemptContext context) throws IOException {
        return new CombineFileRecordReader<Text, Text>((CombineFileSplit) split, context, MyMultiFileRecordReader.class);
    }

    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }

但是现在我想跳过文件,而不是在文件不存在时出现错误(s3位置不是我控制的,如果我在main中有只获取存在的文件的逻辑,甚至会遇到问题)

FileInputFormat.setInputPaths(job, inputPaths);

即使运行了上述代码,文件也可能消失
有办法吗?
我可以重写吗 InputFormat ?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题