我的mapper输入和reducer输出是如何相同的

k7fdbhmy  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(529)

我遇到了一个有趣的情况,我的Map器输入与reducer输出相同(reducer代码不起作用)。这是我的第一个数据集,因为我是一个新手。提前谢谢。
问题陈述:找到年度最高温度。
考虑一下,下面是我的数据集(year和temp列用制表符分隔)

  1. 2001 32
  2. 2001 50
  3. 2001 18
  4. 2001 21
  5. 2002 30
  6. 2002 34
  7. 2002 12
  8. 2003 09
  9. 2003 12

Map程序代码

  1. import java.io.IOException;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. public class MapperCode extends Mapper<LongWritable,Text,Text,IntWritable> {
  7. public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
  8. {
  9. String Line=value.toString();
  10. String keyvalpair[]=Line.split("\t");
  11. context.write(new Text(keyvalpair[0].trim()), new IntWritable(Integer.parseInt(keyvalpair[1].trim())));
  12. }
  13. }

减速机代码:

  1. import java.io.IOException;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. public class ReducerCode extends Reducer<Text,IntWritable,Text,IntWritable> {
  6. public void reducer(Text key,Iterable<IntWritable> value,Context context)throws IOException,InterruptedException
  7. {
  8. int max=0;
  9. for (IntWritable values:value)
  10. {
  11. max=Math.max(max, values.get());
  12. }
  13. context.write(key,new IntWritable(max));
  14. }
  15. }

驱动程序代码:

  1. import java.io.IOException;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. public class MaxTemp extends Configuration {
  11. public static void main(String[] args) throws IOException,InterruptedException,Exception {
  12. Job job=new Job();
  13. job.setJobName("MaxTemp");
  14. job.setJarByClass(MaxTemp.class);
  15. FileInputFormat.addInputPath(job, new Path(args[0]));
  16. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  17. job.setMapperClass(MapperCode.class);
  18. job.setReducerClass(ReducerCode.class);
  19. job.setInputFormatClass(TextInputFormat.class);
  20. job.setOutputFormatClass(TextOutputFormat.class);
  21. job.setOutputKeyClass(Text.class);
  22. job.setOutputValueClass(IntWritable.class);
  23. job.waitForCompletion(true);
  24. }
  25. }

请让我知道我在哪里犯了错。为什么我的o/p与输入数据集相同。

cuxqih21

cuxqih211#

这个 Reducer 实现必须覆盖 reduce() 方法。实现的方法名为 reducer() 从来没有叫过。
把它改成

  1. public class ReducerCode extends Reducer<Text,IntWritable,Text,IntWritable> {
  2. public void reduce(Text key,Iterable<IntWritable> value,Context context)throws IOException,InterruptedException {

相关问题