hadoop2:使用自定义inputformat时结果为空

rhfm7lfc  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(489)

我想用自己的 FileInputFormat 有一个习惯 RecordReader 将csv数据读入 <Long><String> 对。
因此我创建了这个类 MyTextInputFormat :

  1. import java.io.IOException;
  2. import org.apache.hadoop.fs.FileSystem;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapred.FileInputFormat;
  6. import org.apache.hadoop.mapred.FileSplit;
  7. import org.apache.hadoop.mapred.InputSplit;
  8. import org.apache.hadoop.mapred.JobConf;
  9. import org.apache.hadoop.mapred.RecordReader;
  10. import org.apache.hadoop.mapred.Reporter;
  11. public class MyTextInputFormat extends FileInputFormat<Long, String> {
  12. @Override
  13. public RecordReader<Long, String> getRecordReader(InputSplit input, JobConf job, Reporter reporter) throws IOException {
  14. reporter.setStatus(input.toString());
  15. return new MyStringRecordReader(job, (FileSplit)input);
  16. }
  17. @Override
  18. protected boolean isSplitable(FileSystem fs, Path filename) {
  19. return super.isSplitable(fs, filename);
  20. }
  21. }

还有班级 MyStringRecordReader :

  1. import java.io.IOException;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapred.FileSplit;
  5. import org.apache.hadoop.mapred.JobConf;
  6. import org.apache.hadoop.mapred.LineRecordReader;
  7. import org.apache.hadoop.mapred.RecordReader;
  8. public class MyStringRecordReader implements RecordReader<Long, String> {
  9. private LineRecordReader lineReader;
  10. private LongWritable lineKey;
  11. private Text lineValue;
  12. public MyStringRecordReader(JobConf job, FileSplit split) throws IOException {
  13. lineReader = new LineRecordReader(job, split);
  14. lineKey = lineReader.createKey();
  15. lineValue = lineReader.createValue();
  16. System.out.println("constructor called");
  17. }
  18. @Override
  19. public void close() throws IOException {
  20. lineReader.close();
  21. }
  22. @Override
  23. public Long createKey() {
  24. return lineKey.get();
  25. }
  26. @Override
  27. public String createValue() {
  28. System.out.println("createValue called");
  29. return lineValue.toString();
  30. }
  31. @Override
  32. public long getPos() throws IOException {
  33. return lineReader.getPos();
  34. }
  35. @Override
  36. public float getProgress() throws IOException {
  37. return lineReader.getProgress();
  38. }
  39. @Override
  40. public boolean next(Long key, String value) throws IOException {
  41. System.out.println("next called");
  42. // get the next line
  43. if (!lineReader.next(lineKey, lineValue)) {
  44. return false;
  45. }
  46. key = lineKey.get();
  47. value = lineValue.toString();
  48. System.out.println(key);
  49. System.out.println(value);
  50. return true;
  51. }
  52. }

在我的spark应用程序中,我通过调用 sparkContext.hadoopFile 方法。但我只能从以下代码中得到一个空输出:

  1. public class AssociationRulesAnalysis {
  2. @SuppressWarnings("serial")
  3. public static void main(String[] args) {
  4. JavaRDD<String> inputRdd = sc.hadoopFile(inputFilePath, MyTextInputFormat.class, Long.class, String.class).map(new Function<Tuple2<Long,String>, String>() {
  5. @Override
  6. public String call(Tuple2<Long, String> arg0) throws Exception {
  7. System.out.println("map: " + arg0._2());
  8. return arg0._2();
  9. }
  10. });
  11. List<String> asList = inputRdd.take(10);
  12. for(String s : asList) {
  13. System.out.println(s);
  14. }
  15. }
  16. }

我从rdd只收到10条空线。
添加了 prints 如下所示:

  1. === APP STARTED : local-1467182320798
  2. constructor called
  3. createValue called
  4. next called
  5. 0
  6. ä1
  7. map:
  8. next called
  9. 8
  10. ö2
  11. map:
  12. next called
  13. 13
  14. ü3
  15. map:
  16. next called
  17. 18
  18. ß4
  19. map:
  20. next called
  21. 23
  22. ä5
  23. map:
  24. next called
  25. 28
  26. ö6
  27. map:
  28. next called
  29. 33
  30. ü7
  31. map:
  32. next called
  33. 38
  34. ß8
  35. map:
  36. next called
  37. 43
  38. ä9
  39. map:
  40. next called
  41. 48
  42. ü10
  43. map:
  44. next called
  45. 54
  46. ä11
  47. map:
  48. next called
  49. 60
  50. ß12
  51. map:
  52. next called
  53. 12
  54. =====================
  55. constructor called
  56. createValue called
  57. next called
  58. 0
  59. ä1
  60. map:
  61. next called
  62. 8
  63. ö2
  64. map:
  65. next called
  66. 13
  67. ü3
  68. map:
  69. next called
  70. 18
  71. ß4
  72. map:
  73. next called
  74. 23
  75. ä5
  76. map:
  77. next called
  78. 28
  79. ö6
  80. map:
  81. next called
  82. 33
  83. ü7
  84. map:
  85. next called
  86. 38
  87. ß8
  88. map:
  89. next called
  90. 43
  91. ä9
  92. map:
  93. next called
  94. 48
  95. ü10
  96. map:
  97. Stopping...

(rdd数据打印在 ===== 输出(10个空行!!!)。输出高于 ===== 似乎是由 RDD.count 打电话。在 next 方法显示正确的键和值!?我做错什么了?

zdwk9cvp

zdwk9cvp1#

lineKey 以及 lineValue 从不初始化为 key 以及 value 传递到覆盖的 next 方法 MyStringRecordReader . 因此,当您尝试使用 RecordReader . 如果要为文件中的记录使用不同的键和值,则需要使用传递给 next 方法并用计算的键和值初始化它们。如果您不想更改键/值记录,那么请删除以下内容。每次执行这段代码时,都会用空字符串和0l覆盖从文件中读取的键/值。

  1. key = lineKey.get();
  2. value = lineValue.toString();

相关问题