hadoop>mapper类输入错误

pbwdgjma  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(446)

我使用的输入文本文件的内容是

  1. 1 "Come
  2. 1 "Defects,"
  3. 1 "I
  4. 1 "Information
  5. 1 "J"
  6. 2 "Plain
  7. 5 "Project
  8. 1 "Right
  9. 1 "Viator"

左边的数字和右边的单词用tab分隔,但是当我执行下面的mapper函数时

  1. public static class SortingMapper extends Mapper<Text, Text, Pair, NullWritable>
  2. {
  3. private Text word = new Text();
  4. private IntWritable freq = new IntWritable();
  5. @Override
  6. public void map(Text key, Text value, Context context) throws IOException, InterruptedException
  7. {
  8. String line = value.toString();
  9. String[] words = line.split("\t");
  10. freq = new IntWritable(Integer.parseInt(words[0]));
  11. word.set(words[1]);
  12. context.write(new Pair(word, freq), NullWritable.get());}}
  13. public static class FirstPartitioner extends Partitioner<Pair, NullWritable>
  14. {
  15. @Override
  16. public int getPartition(Pair p, NullWritable n, int numPartitions)
  17. {
  18. String word = p.getFirst().toString();
  19. char first = word.charAt(0);
  20. char middle = 'n';
  21. if(middle < first)
  22. {
  23. return 0;
  24. }
  25. else
  26. return 1 % numPartitions; //why does % need???
  27. }
  28. }
  29. public static class KeyComparator extends WritableComparator
  30. {
  31. protected KeyComparator()
  32. {
  33. super(Pair.class, true);
  34. }
  35. @Override
  36. public int compare(WritableComparable w1, WritableComparable w2)
  37. {
  38. Pair v1 = (Pair) w1;
  39. Pair v2 = (Pair) w2;
  40. /*
  41. * since we already count word in the first MR we only need to sort the list by frequency
  42. * so no need to compare Text again
  43. int cmp = Pair.compare(v1.getFirst(), v2.getFirst());
  44. if(cmp != 0) { return cmp; }
  45. */
  46. return -1 * v1.compareTo(v2);
  47. //possible error: it compares Text first and then compare IntWritable
  48. }
  49. }
  50. public static class GroupComparator extends WritableComparator
  51. {
  52. protected GroupComparator()
  53. {
  54. super(Pair.class, true);
  55. }
  56. @Override
  57. public int compare(WritableComparable w1, WritableComparable w2)
  58. {
  59. Pair v1 = (Pair) w1;
  60. Pair v2 = (Pair) w2;
  61. return v1.getFirst().compareTo(v2.getFirst());
  62. //this compareTo is under binarycomparable
  63. }
  64. }
  65. public static class SortingReducer extends Reducer<Pair, NullWritable, Pair, NullWritable>
  66. {
  67. @Override
  68. public void reduce(Pair p, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException
  69. {
  70. System.out.println("sortingReducer");
  71. context.write(p, NullWritable.get());
  72. }
  73. }
  74. public static void main(String[] args) throws Exception
  75. {
  76. Configuration conf2 = new Configuration();
  77. //String[] otherArgs2 = new GenericOptionsParser(conf1, args).getRemainingArgs();
  78. ControlledJob cJob2 = new ControlledJob(conf2);
  79. //conf2.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " ");
  80. cJob2.setJobName("Sorting");
  81. Job job2 = cJob2.getJob();
  82. job2.setJarByClass(Sorting.class);
  83. job2.setInputFormatClass(KeyValueTextInputFormat.class);
  84. job2.setMapperClass(SortingMapper.class);
  85. job2.setPartitionerClass(FirstPartitioner.class);
  86. job2.setSortComparatorClass(KeyComparator.class);
  87. job2.setGroupingComparatorClass(GroupComparator.class);
  88. job2.setReducerClass(SortingReducer.class);
  89. job2.setOutputKeyClass(Pair.class);
  90. job2.setOutputValueClass(NullWritable.class);
  91. job2.setOutputFormatClass(TextOutputFormat.class);
  92. FileInputFormat.addInputPath(job2, new Path("hdfs:///tmp/inter/part-r-
  93. 00000.txt"));
  94. FileOutputFormat.setOutputPath(job2, new Path(args[0]));
  95. job2.waitForCompletion(true);
  96. }

下面是一些错误

  1. Error: java.lang.NumberFormatException: For input string: ""Come"
  2. at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
  3. at java.lang.Integer.parseInt(Integer.java:481)
  4. at java.lang.Integer.parseInt(Integer.java:527)
  5. at Sorting$SortingMapper.map(Sorting.java:98)
  6. at Sorting$SortingMapper.map(Sorting.java:1)
  7. at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
  8. at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
  9. at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
  10. at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
  11. at java.security.AccessController.doPrivileged(Native Method)
  12. at javax.security.auth.Subject.doAs(Subject.java:415)
  13. at apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1557)
  14. at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)

我猜string[]单词有问题,但我不知道该怎么解决。如果你能帮我改正错误,我将不胜感激。

另外

我发现我曾经

  1. job2.setInputFormatClass(KeyValueTextInputFormat.class);

在main函数中,它通过制表符分隔键和值,所以我改变了

  1. String line = value.toString();
  2. String[] words = line.split("\t");
  3. freq = new IntWritable(Integer.parseInt(words[0]));
  4. word.set(words[1]);

进入之内

  1. String num = key.toString();
  2. freq = new IntWritable(Integer.parseInt(num));
  3. word = value;
  4. context.write(new Pair(word, freq), NullWritable.get());

它运行成功,但输出很奇怪。

  1. Sorting$Pair@5b5b072f
  2. Sorting$Pair@5b5b072f
  3. Sorting$Pair@5b5b072f
  4. Sorting$Pair@5b5b072f
  5. Sorting$Pair@5b5b072f
  6. Sorting$Pair@5b5b072f
  7. Sorting$Pair@5b5b072f
  8. Sorting$Pair@5b5b072f
  9. Sorting$Pair@5b5b072f
  10. Sorting$Pair@5b5b072f
  11. Sorting$Pair@5b5b072f
  12. Sorting$Pair@5b5b072f
  13. Sorting$Pair@5b5b072f
  14. Sorting$Pair@5b5b072f
  15. Sorting$Pair@5b5b072f
  16. Sorting$Pair@5b5b072f
  17. ....

我的预期产出是

  1. 5 "Project
  2. 2 "Plain
  3. 1 "Come
  4. 1 "Defects,"
  5. 1 "I
  6. 1 "Information
  7. 1 "J"
  8. 1 "Right
  9. 1 "Viator"

变化让情况变得更糟了吗?

0yg35tkg

0yg35tkg1#

你只需要重写 toString 在你的 Pair 对象并返回任何您想要作为每个记录的最终输出的内容。
像这样的。。。

  1. class Pair {
  2. ...
  3. @Override
  4. public String toString() {
  5. return freq + " " + word;
  6. }
  7. }

相关问题