hadoop:Map程序没有从多个输入路径读取文件

nwwlzxa7  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(427)

Map程序无法从多个目录中读取文件。有人能帮忙吗?我需要在每个Map器中读取一个文件。我添加了多个输入路径,并实现了自定义的wholefileinputformat,wholefilerecordreader。在map方法中,我不需要输入键。我确保每张Map都能读懂一个完整的文件。
命令行:hadoop jar autoproduce.jar autoproduce/input\u a/input\u b/output i指定了两个输入路径----1.input\u a;二输入;
运行方法片段:

  1. Job job = new Job(getConf());
  2. job.setInputFormatClass(WholeFileInputFormat.class);
  3. FileInputFormat.setInputPaths(job, new Path(args[0]), new Path(args[1]));
  4. FileOutputFormat.setOutputPath(job, new Path(args[2]));

Map方法片段:

  1. public void map(NullWritable key, BytesWritable value, Context context){
  2. FileSplit fileSplit = (FileSplit) context.getInputSplit();
  3. System.out.println("Directory :" + fileSplit.getPath().toString());
  4. ......
  5. }

自定义wholefileinputformat:

  1. class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
  2. @Override
  3. protected boolean isSplitable(JobContext context, Path file) {
  4. return false;
  5. }
  6. @Override
  7. public RecordReader<NullWritable, BytesWritable> createRecordReader(
  8. InputSplit split, TaskAttemptContext context) throws IOException,
  9. InterruptedException {
  10. WholeFileRecordReader reader = new WholeFileRecordReader();
  11. reader.initialize(split, context);
  12. return reader;
  13. }
  14. }

自定义wholefilerecordreader:

  1. class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
  2. private FileSplit fileSplit;
  3. private Configuration conf;
  4. private BytesWritable value = new BytesWritable();
  5. private boolean processed = false;
  6. @Override
  7. public void initialize(InputSplit split, TaskAttemptContext context)
  8. throws IOException, InterruptedException {
  9. this.fileSplit = (FileSplit) split;
  10. this.conf = context.getConfiguration();
  11. }
  12. @Override
  13. public boolean nextKeyValue() throws IOException, InterruptedException {
  14. if (!processed) {
  15. byte[] contents = new byte[(int) fileSplit.getLength()];
  16. Path file = fileSplit.getPath();
  17. FileSystem fs = file.getFileSystem(conf);
  18. FSDataInputStream in = null;
  19. try {
  20. in = fs.open(file);
  21. IOUtils.readFully(in, contents, 0, contents.length);
  22. value.set(contents, 0, contents.length);
  23. } finally {
  24. IOUtils.closeStream(in);
  25. }
  26. processed = true;
  27. return true;
  28. }
  29. return false;
  30. }
  31. @Override
  32. public NullWritable getCurrentKey() throws IOException,InterruptedException {
  33. return NullWritable.get();
  34. }
  35. @Override
  36. public BytesWritable getCurrentValue() throws IOException,InterruptedException {
  37. return value;
  38. }
  39. @Override
  40. public float getProgress() throws IOException {
  41. return processed ? 1.0f : 0.0f;
  42. }
  43. @Override
  44. public void close() throws IOException {
  45. // do nothing
  46. }
  47. }

问题:
设置两个输入路径后,所有Map任务仅从一个目录读取文件。。
提前谢谢。

mbjcgjjk

mbjcgjjk1#

你必须使用 MultipleInputs 而不是 FileInputFormat 在司机室。所以你的代码应该是:

  1. MultipleInputs.addInputPath(job, new Path(args[0]), <Input_Format_Class_1>);
  2. MultipleInputs.addInputPath(job, new Path(args[1]), <Input_Format_Class_2>);
  3. .
  4. .
  5. .
  6. MultipleInputs.addInputPath(job, new Path(args[N-1]), <Input_Format_Class_N>);

所以如果你想用 WholeFileInputFormat 对于第一个输入路径 TextInputFormat 对于第二个输入路径,必须按以下方式使用:

  1. MultipleInputs.addInputPath(job, new Path(args[0]), WholeFileInputFormat.class);
  2. MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class);

希望这对你有用!

相关问题