java—在运行HadoopMapReduce作业时获取文件名/文件数据作为map的键/值输入

0sgqnhkj  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(210)

在运行hadoopmapreduce作业时,如何将文件名/文件内容作为map的键/值输入?在这里。虽然它解释了这个概念,但我无法成功地将其转换为代码。
基本上,我希望文件名作为键,文件数据作为值。为此我写了一个习惯 RecordReader 如上述问题所建议。但我不明白如何将文件名作为类中的键。还有,在写习俗的时候 FileInputFormat 同学们,我不明白如何归还海关 RecordReader 我以前写过。
这个 RecordReader 代码为:

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class CustomRecordReader extends RecordReader<Text, Text> {

    private static final String LINE_SEPARATOR = System.getProperty("line.separator");

    private StringBuffer valueBuffer = new StringBuffer("");
    private Text key = new Text();
    private Text value = new Text();
    private RecordReader<Text, Text> recordReader;

    public SPDRecordReader(RecordReader<Text, Text> recordReader) {
        this.recordReader = recordReader;
    }

    @Override
    public void close() throws IOException {
        recordReader.close();
    }

    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return recordReader.getProgress();
    }

    @Override
    public void initialize(InputSplit arg0, TaskAttemptContext arg1)
            throws IOException, InterruptedException {
        recordReader.initialize(arg0, arg1);
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {

        if (valueBuffer.equals("")) {
            while (recordReader.nextKeyValue()) {
                valueBuffer.append(recordReader.getCurrentValue());
                valueBuffer.append(LINE_SEPARATOR);
            }
            value.set(valueBuffer.toString());
            return true;
        }
        return false;
    }

}

不完整的 FileInputFormat 课程是:

import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

public class CustomFileInputFormat extends FileInputFormat<Text, Text> {

    @Override
    protected boolean isSplitable(FileSystem fs, Path filename) {
        return false;
    }

    @Override
    public RecordReader<Text, Text> getRecordReader(InputSplit arg0, JobConf arg1,
            Reporter arg2) throws IOException {
        return null;
    }
}
gab6jxml

gab6jxml1#

你的电脑里有这个代码吗 CustomRecordReader 班级。

private LineRecordReader lineReader;

private String fileName;

public CustomRecordReader(JobConf job, FileSplit split) throws IOException {
    lineReader = new LineRecordReader(job, split);
    fileName = split.getPath().getName();
}

public boolean next(Text key, Text value) throws IOException {
    // get the next line
    if (!lineReader.next(key, value)) {
        return false;
    }    

    key.set(fileName);
    value.set(value);

    return true;
}

public Text createKey() {
    return new Text("");
}

public Text createValue() {
    return new Text("");
}

删除 SPDRecordReader 构造函数(这是一个错误)。
把这个代码放在你的电脑里 CustomFileInputFormat

public RecordReader<Text, Text> getRecordReader(
  InputSplit input, JobConf job, Reporter reporter)
  throws IOException {

    reporter.setStatus(input.toString());
    return new CustomRecordReader(job, (FileSplit)input);
}

相关问题