mapreduce字数示例

cygmwpex  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(462)

我尝试使用outputcollector运行基本wordcount mapreduce示例,但遇到异常。
info mapreduce.job:job job\u local1048833344\u 0001失败,状态为failed,原因是:na java.lang.exception:java.io.ioexception:map中的键类型不匹配:应为org.apache.hadoop.io.text,收到org.apache.hadoop.io.longwritable。。。
下面是我尝试运行的代码:

import java.io.*;
import java.util.StringTokenizer;
import java.util.Iterator; 

import org.apache.hadoop.io.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.io.ObjectWritable; 
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCountOutputCollector {

    public static class WordCountOutputCollectorMapper  extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {
                word.set(tokenizer.nextToken());
                output.collect(word, one);
            }
        }
    }

    public static class WordCountOutputCollectorReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            output.collect(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }
        Job job = new Job(conf, "word count outputcollector");
        job.setJarByClass(WordCountOutputCollector.class);
        job.setMapperClass(WordCountOutputCollectorMapper.class);
        job.setCombinerClass(WordCountOutputCollectorReducer.class);
        job.setReducerClass(WordCountOutputCollectorReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //conf.setInputFormat(TextInputFormat.class);
        //conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));    
        //JobClient.runJob(conf);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
niknxzdl

niknxzdl1#

我认为这主要是因为Map输出没有被转换成文本。
尝试取消对以下代码的注解:

conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);

    JobClient.runJob(conf);
6za6bjd0

6za6bjd02#

试试这个:hadoop wordcount

import java.io.IOException;
    import java.io.PrintStream;
    import java.util.StringTokenizer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Mapper.Context;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.Reducer.Context;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;

    public class WordCount
    {
      public static class Map
        extends Mapper<LongWritable, Text, Text, IntWritable>
      {
        private static final IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(LongWritable paramLongWritable, Text paramText, Mapper<LongWritable, Text, Text, IntWritable>.Context paramMapper)
          throws IOException, InterruptedException
        {
          StringTokenizer localStringTokenizer = new StringTokenizer(paramText.toString());
          while (localStringTokenizer.hasMoreTokens())
          {
            this.word.set(localStringTokenizer.nextToken());
            paramMapper.write(this.word, one);
          }
        }
      }

      public static class Reduce
        extends Reducer<Text, IntWritable, Text, IntWritable>
      {
        private IntWritable result = new IntWritable();

        public void reduce(Text paramText, Iterable<IntWritable> paramIterable, Reducer<Text, IntWritable, Text, IntWritable>.Context paramReducer)
          throws IOException, InterruptedException
        {
          int i = 0;
          for (IntWritable localIntWritable : paramIterable) {
            i += localIntWritable.get();
          }
          this.result.set(i);

          paramReducer.write(paramText, this.result);
        }
      }

      public static void main(String[] paramArrayOfString)
        throws Exception
      {
        Configuration localConfiguration = new Configuration();
        String[] arrayOfString = new GenericOptionsParser(localConfiguration, paramArrayOfString).getRemainingArgs();
        if (arrayOfString.length != 2)
        {
          System.err.println("Usage: WordCount <in> <out>");
          System.exit(2);
        }
        Job localJob = new Job(localConfiguration, "wordcount");
        localJob.setJarByClass(WordCount.class);
        localJob.setMapperClass(WordCount.Map.class);
        localJob.setReducerClass(WordCount.Reduce.class);

        localJob.setCombinerClass(WordCount.Reduce.class);

        localJob.setOutputKeyClass(Text.class);

        localJob.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(localJob, new Path(arrayOfString[0]));

        FileOutputFormat.setOutputPath(localJob, new Path(arrayOfString[1]));

        System.exit(localJob.waitForCompletion(true) ? 0 : 1);
      }
    }

相关问题