hadoop中inputstream的过早eof

fhity93d  于 2021-06-02  发布在  Hadoop
关注(0)|答案(0)|浏览(319)

我想在hadoop中逐块(不是逐行)读取大文件,其中每个块的大小接近5 mb。为此我写了一个习惯 recordreader . 但这给了我一个错误 Premature EOF from inputStream ,这是由 nextKeyValue() , readfully() ,同时阅读。
这是我的密码:

  1. import java.io.IOException;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FSDataInputStream;
  4. import org.apache.hadoop.fs.FileSystem;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IOUtils;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.InputSplit;
  9. import org.apache.hadoop.mapreduce.RecordReader;
  10. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  11. import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  12. public class WholeFileRecordReader extends RecordReader<Text, apriori> {
  13. public Text key = new Text("");
  14. public apriori value = new apriori();
  15. public Configuration job;
  16. public FileSplit filesplit;
  17. public FSDataInputStream in;
  18. public Boolean processed = false;
  19. public int len = 5000000;
  20. public long filepointer = 0;
  21. public int mapperFlag = 0;
  22. public WholeFileRecordReader(FileSplit arg0, TaskAttemptContext arg1) {
  23. this.filesplit = arg0;
  24. this.job=arg1.getConfiguration();
  25. }
  26. @Override
  27. public void close() throws IOException {
  28. }
  29. @Override
  30. public Text getCurrentKey() throws IOException, InterruptedException {
  31. return key;
  32. }
  33. @Override
  34. public apriori getCurrentValue() throws IOException, InterruptedException {
  35. return value;
  36. }
  37. @Override
  38. public float getProgress() throws IOException, InterruptedException {
  39. return processed ? 1.0f : 0.0f;
  40. }
  41. @Override
  42. public void initialize(InputSplit arg0, TaskAttemptContext arg1)
  43. throws IOException, InterruptedException {
  44. this.job = arg1.getConfiguration();
  45. this.filesplit = (FileSplit)arg0;
  46. final Path file = filesplit.getPath();
  47. FileSystem fs = file.getFileSystem(job);
  48. in = fs.open(file);
  49. }
  50. @Override
  51. public boolean nextKeyValue() throws IOException, InterruptedException {
  52. if ((!processed)&&(filesplit.getLength()>filepointer)) {
  53. byte[] contents = new byte[ len];
  54. Path file = filesplit.getPath();
  55. key.set(file.getName());
  56. in.seek(filepointer);
  57. try {
  58. IOUtils.readFully(in, contents, 0, len);
  59. value.set(contents, 0, len);
  60. } finally {
  61. // IOUtils.closeStream(in);
  62. }
  63. filepointer = filepointer + len;
  64. processed = false;
  65. return true;
  66. }
  67. else if((!processed)&&(filesplit.getLength()<filepointer))
  68. {
  69. Path file = filesplit.getPath();
  70. key.set(file.getName());
  71. int last = (int)(filesplit.getLength()-(filepointer-len));
  72. byte[] contents = new byte[last];
  73. in.seek(filepointer-len);
  74. try {
  75. IOUtils.readFully(in, contents, 0, last);
  76. mapperFlag =1;
  77. value.set(contents, 0, last,mapperFlag);
  78. } finally {
  79. IOUtils.closeStream(in);
  80. }
  81. processed = true;
  82. return true;
  83. }
  84. return false;
  85. }
  86. }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题