未调用自定义recordreader initialize

h9vpoimq  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(424)

我最近开始使用hadoop,只是创建了自己的inputformat来处理pdf。
由于某些原因,我的自定义recordreader类没有调用它的initialize方法(用sysout检查,因为我还没有设置调试环境)
我在Windows7 32位上运行Hadoop2.2.0。用yarn jar做我的调用,因为hadoop jar在windows下被窃听了。。。

  1. import ...
  2. public class PDFInputFormat extends FileInputFormat<Text, Text>
  3. {
  4. @Override
  5. public RecordReader<Text, Text> getRecordReader(InputSplit arg0,
  6. JobConf arg1, Reporter arg2) throws IOException
  7. {
  8. return new PDFRecordReader();
  9. }
  10. public static class PDFRecordReader implements RecordReader<Text, Text>
  11. {
  12. private FSDataInputStream fileIn;
  13. public String fileName=null;
  14. HashSet<String> hset=new HashSet<String>();
  15. private Text key=null;
  16. private Text value=null;
  17. private byte[] output=null;
  18. private int position = 0;
  19. @Override
  20. public Text createValue() {
  21. int endpos = -1;
  22. for (int i = position; i < output.length; i++){
  23. if (output[i] == (byte) '\n') {
  24. endpos = i;
  25. }
  26. }
  27. if (endpos == -1) {
  28. return new Text(Arrays.copyOfRange(output,position,output.length));
  29. }
  30. return new Text(Arrays.copyOfRange(output,position,endpos));
  31. }
  32. @Override
  33. public void initialize(InputSplit genericSplit, TaskAttemptContext job) throws
  34. IOException, InterruptedException
  35. {
  36. System.out.println("initialization is called");
  37. FileSplit split=(FileSplit) genericSplit;
  38. Configuration conf=job.getConfiguration();
  39. Path file=split.getPath();
  40. FileSystem fs=file.getFileSystem(conf);
  41. fileIn= fs.open(split.getPath());
  42. fileName=split.getPath().getName().toString();
  43. System.out.println(fileIn.toString());
  44. PDDocument docum = PDDocument.load(fileIn);
  45. ByteArrayOutputStream boss = new ByteArrayOutputStream();
  46. OutputStreamWriter ow = new OutputStreamWriter(boss);
  47. PDFTextStripper stripper=new PDFTextStripper();
  48. stripper.writeText(docum, ow);
  49. ow.flush();
  50. output = boss.toByteArray();
  51. }
  52. }
  53. }
yws3nbqq

yws3nbqq1#

我昨晚想出来了,我可能会帮别人:
recordreader是一个不推荐使用的hadoop接口(hadoop.common.mapred),它实际上不包含initialize方法,这就解释了为什么不能自动调用它。
通过扩展hadoop.common.mapreduce中的recordreader类,可以扩展该类的initialize方法。

qqrboqgw

qqrboqgw2#

这个 System.out.println() 运行作业时可能没有帮助。以确保您的 initialize() 是叫还是不试试扔一些 RuntimeException 具体如下:

  1. @Override
  2. public void initialize(InputSplit genericSplit, TaskAttemptContext job) throws
  3. IOException, InterruptedException
  4. {
  5. throw new NullPointerException("inside initialize()");
  6. ....

这绝对可以。

相关问题