java—分析多个输入文件并只输出一个包含一个最终结果的文件

alen0pnh  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(243)

我对mapreduce不是很了解。我需要实现的是从几个输入文件的分析中输出一行结果。目前,我的结果每个输入文件包含一行。因此,如果我有3个输入文件,我将有一个输出文件包含3行;每个输入的结果。因为我对结果进行排序,所以只需要将第一个结果写入hdfs文件。我的代码如下:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordLength {

    public static class Map extends Mapper<Object, Text, LongWritable, Text> {
       // private final static IntWritable one = new IntWritable(1);
        int max = Integer.MIN_VALUE;
         private Text word = new Text();
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString(); //cumleni goturur file dan, 1 line i
            StringTokenizer tokenizer = new StringTokenizer(line); //cumleni sozlere bolur 
            while (tokenizer.hasMoreTokens()) {
                String s= tokenizer.nextToken();
                int val = s.length();
                if(val>max) {
                    max=val;
                    word.set(s);

                }
          }

        }

        public void cleanup(Context context) throws IOException, InterruptedException {    
            context.write(new LongWritable(max), word);    
        }
    }

  public static class IntSumReducer
       extends Reducer<LongWritable,Text,Text,LongWritable> {
    private IntWritable result = new IntWritable();
    int max=-100;
    public void reduce(LongWritable key, Iterable<Text> values,
                       Context context
                       ) throws IOException, InterruptedException {

             context.write(new Text("longest"), key);

        //context.write(new Text("longest"),key);
      System.err.println(key);

    }
  }

  public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(Map.class);
        job.setSortComparatorClass(LongWritable.DecreasingComparator.class);
       //job.setCombinerClass(IntSumReducer.class);
        job.setNumReduceTasks(1);
        job.setReducerClass(IntSumReducer.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }

  }
}

它在每次输入时找出一个单词的最长长度并打印出来。但是我需要在所有可能的输入文件中找到最长的长度,并且只打印一行。
所以输出是:
最长11
最长10
最长8
我只希望它包含:
最长11
谢谢

xlpyo6sf

xlpyo6sf1#

更改了查找最长字长的代码。现在只印了11张。如果你有更好的方法,请随时纠正我的解决方案,因为我渴望学习最佳的选择

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

    public static class Map extends Mapper<Object, Text, Text, LongWritable> {
       // private final static IntWritable one = new IntWritable(1);
        int max = Integer.MIN_VALUE;
         private Text word = new Text();
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString(); //cumleni goturur file dan, 1 line i
            StringTokenizer tokenizer = new StringTokenizer(line); //cumleni sozlere bolur 
            while (tokenizer.hasMoreTokens()) {
                String s= tokenizer.nextToken();
                int val = s.length();
                    if(val>max) {
                        max=val;
                        word.set(s);

                    context.write(word,new LongWritable(val)); 

          }

        }
        }

    }

  public static class IntSumReducer
       extends Reducer<Text,LongWritable,Text,LongWritable> {
    private LongWritable result = new LongWritable();
    long max=-100;
    public void reduce(Text key, Iterable<LongWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {

     // int sum = -1;

        for (LongWritable val : values) {
           if(val.get()>max) {
               max=val.get();

           }
          }
        result.set(max);

    }

    public void cleanup(Context context) throws IOException, InterruptedException {    
        context.write(new Text("longest"),result );   
    }
  }

  public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(Map.class);
        job.setSortComparatorClass(LongWritable.DecreasingComparator.class);
      // job.setCombinerClass(IntSumReducer.class);
        job.setNumReduceTasks(1);
        job.setReducerClass(IntSumReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }

}

相关问题