如果mapreduce hadoop中的s3中不存在文件,如何在使用多输入时跳过文件?

x33g5p2x  于 2021-07-15  发布在  Hadoop
关注(0)|答案(0)|浏览(258)

我有下面的代码,允许每个Map器使用多个文件,如果文件大小小于一定的限制

  1. static class MyMultiFileRecordReader extends org.apache.hadoop.mapreduce.RecordReader<Text, Text> {
  2. private final org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader reader;
  3. private final int index;
  4. public MyMultiFileRecordReader(org.apache.hadoop.mapreduce.lib.input.CombineFileSplit split, org.apache.hadoop.mapreduce.TaskAttemptContext context, Integer index) throws IOException {
  5. this.index = index;
  6. this.reader = new org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader(context.getConfiguration());
  7. }
  8. @Override
  9. public void initialize(InputSplit split, org.apache.hadoop.mapreduce.TaskAttemptContext context) throws IOException, InterruptedException {
  10. org.apache.hadoop.mapreduce.lib.input.CombineFileSplit combineSplit = (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) split;
  11. Path file = combineSplit.getPath(index);
  12. long start = combineSplit.getOffset(index);
  13. long length = combineSplit.getLength(index);
  14. String[] hosts = combineSplit.getLocations();
  15. org.apache.hadoop.mapreduce.lib.input.FileSplit fileSplit = new FileSplit(file, start, length, hosts);
  16. reader.initialize(fileSplit, context);
  17. }
  18. @Override
  19. public boolean nextKeyValue() throws IOException, InterruptedException {
  20. return reader.nextKeyValue();
  21. }
  22. @Override
  23. public Text getCurrentKey() throws IOException, InterruptedException {
  24. return reader.getCurrentKey();
  25. }
  26. @Override
  27. public Text getCurrentValue() throws IOException, InterruptedException {
  28. return reader.getCurrentValue();
  29. }
  30. @Override
  31. public float getProgress() throws IOException, InterruptedException {
  32. return reader.getProgress();
  33. }
  34. @Override
  35. public void close() throws IOException {
  36. reader.close();
  37. }
  38. }
  39. public MyMultiFileInputFormat() {
  40. super();
  41. }
  42. @Override
  43. public org.apache.hadoop.mapreduce.RecordReader<Text, Text> createRecordReader(InputSplit split, org.apache.hadoop.mapreduce.TaskAttemptContext context) throws IOException {
  44. return new CombineFileRecordReader<Text, Text>((CombineFileSplit) split, context, MyMultiFileRecordReader.class);
  45. }
  46. @Override
  47. protected boolean isSplitable(JobContext context, Path file) {
  48. return false;
  49. }

但是现在我想跳过文件,而不是在文件不存在时出现错误(s3位置不是我控制的,如果我在main中有只获取存在的文件的逻辑,甚至会遇到问题)

  1. FileInputFormat.setInputPaths(job, inputPaths);

即使运行了上述代码,文件也可能消失
有办法吗?
我可以重写吗 InputFormat ?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题