为什么这个使用combiner类的hadoop示例不能正常工作(不要执行合路器提供的“局部还原”)

xkftehaa  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(351)

我是hadoop的新手,我正在做一些实验,试图使用combiner类在Map器的同一个节点上本地执行reduce操作。我使用的是hadoop1.2.1版本。
所以我有三门课:
1) wordcountwithcombiner.java:

// Learning MapReduce by Nitesh Jain
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;

/* 
 * Extend Configured class: g
 * Implement Tool interface:
 * 
 */
public class WordCountWithCombiner extends Configured implements Tool{

  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = getConf(); 

    Job job = new Job(conf, "MyJob");   // Job is a "dashboard" with levers to control the execution of the job

    job.setJarByClass(WordCountWithCombiner.class);             // Name of the driver class into the jar
    job.setJobName("Word Count With Combiners");    // Set the name of the job

    FileInputFormat.addInputPath(job, new Path(args[0]));           // The input file is the first paramether of the main() method
    FileOutputFormat.setOutputPath(job, new Path(args[1]));         // The output file is the second paramether of the main() method

    job.setMapperClass(WordCountMapper.class);          // Set the mapper class

    /* Set the combiner: the combiner is a reducer performed locally on the same mapper node (we are resusing the previous WordCountReduces
     * class because it perform the same task, but locally to the mapper):
     */
    job.setCombinerClass(WordCountReducer.class);
    job.setReducerClass(WordCountReducer.class);        // Set the reducer class

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    return job.waitForCompletion(true) ? 0 : 1;

   }

  public static void main(String[] args) throws Exception {
    /* The ToolRunner object is used to trigger the run() function which contains all the batch execution logic. 
     * What it does is gie the ability to set properties at the own time so we need not to write a single line of code to handle it
     */
    int exitCode = ToolRunner.run(new Configuration(), new WordCountWithCombiner(), args);
    System.exit(exitCode);
}

}

2) wordcountmapper.java:

// Learning MapReduce by Nitesh J.
// Word Count Mapper. 
import java.io.IOException;
import java.util.StringTokenizer;

// Import KEY AND VALUES DATATYPE:
import org.apache.hadoop.io.IntWritable;    // Similiar to Int
import org.apache.hadoop.io.LongWritable;   // Similar to Long
import org.apache.hadoop.io.Text;           // Similar to String

import org.apache.hadoop.mapreduce.Mapper;

/* Every mapper class extend the Hadoop Mapper class.
 * @param input key (the progressive number)
 * @param input type (it is a word so something like a String)
 * @param output key
 * @param output value
 * 
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();

  /* Override the map() function defined by the Mapper extended class:
   * The input parameter have to match with these defined into the extended Mapper class
   * @param context: is used to cast the output key and value paired.
   * 
   * Tokenize the string into words and write these words into the context with words as key, and one (1) as value for each word
   */
  @Override
  public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

      String line = value.toString();
      StringTokenizer itr = new StringTokenizer(line);

      while (itr.hasMoreTokens()) {
          //just added the below line to convert everything to lower case 
          word.set(itr.nextToken().toLowerCase());
          // the following check is that the word starts with an alphabet. 
          if(Character.isAlphabetic((word.toString().charAt(0)))){
              context.write(word, one);
          }
    }
  }

}

3) wordcountreducer.java语言:

// Learning MapReduce by Nitesh Jain
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/* Every reduceer calss have to extender the Hadoop Reducer class
 * @param the mapper output key  (text, the word)
 * @param the mapper output value (the number of occurrence of the related word: 1)
 * @param the redurcer output key (the word)
 * @param the reducer output value (the number of occurrence of the related word)
 * Have to map the Mapper() param
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    /*
     * I have to override the reduce() function defined by the extended Reducer class
     * @param key: the current word
     * @param Iterable<IntWritable> values: because the input of the recudce() function is a key and a list of values associated to this key
     * @param context: collects the output <key, values> pairs
     */
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException {

        int sum = 0;
        for (IntWritable value : values) {
          sum += value.get();
        }
        context.write(key, new IntWritable(sum));
      }

}

因此,正如您在wordcountwithcombiner驱动程序类中看到的,我将wordcountreducer类设置为combiner,以便直接在Map器节点上执行缩减,方法如下:

job.setCombinerClass(WordCountReducer.class);

然后我在hadoop文件系统上有一个输入文件:

andrea@andrea-virtual-machine:~/workspace/HadoopExperiment/bin$ hadoop fs -cat  in
to be or not to be

我想给它做手术。
如果我以经典的方式执行前一批,通过map的2个阶段,并将其缩小,那么它工作得很好,实际上是在linux shel中执行这个statment:

andrea@andrea-virtual-machine:~/workspace/HadoopExperiment/bin$ hadoop jar WordCount.jar WordCountWithCombiner in out6

hadoop做到了,然后我得到了预期的结果:

andrea@andrea-virtual-machine:~/workspace/HadoopExperiment/bin$ hadoop fs -cat  out6/p*
be  2
not 1
or  1
to  2
andrea@andrea-virtual-machine:~/workspace/HadoopExperiment/bin$

好的,很好用。
问题是,现在我不想执行reduce阶段,我希望得到相同的结果,因为我已经在reducer的同一个节点上设置了执行相同操作的合并器。
因此,在linux shell中,我将执行排除reducer阶段的语句:

hadoop jar WordCountWithCombiner.jar WordCountWithCombiner -D mapred.reduce.tasks=0 in out7

但它不能很好地工作,因为这是我获得的(我发布整个输出以添加有关正在发生的事情的更多信息):

andrea@andrea-virtual-machine:~/workspace/HadoopExperiment/bin$ hadoop jar WordCountWithCombiner.jar WordCountWithCombiner -D mapred.reduce.tasks=0 in out7
16/02/13 19:43:44 INFO input.FileInputFormat: Total input paths to process : 1
16/02/13 19:43:44 INFO util.NativeCodeLoader: Loaded the native-hadoop library
16/02/13 19:43:44 WARN snappy.LoadSnappy: Snappy native library not loaded
16/02/13 19:43:45 INFO mapred.JobClient: Running job: job_201601242121_0008
16/02/13 19:43:46 INFO mapred.JobClient:  map 0% reduce 0%
16/02/13 19:44:00 INFO mapred.JobClient:  map 100% reduce 0%
16/02/13 19:44:05 INFO mapred.JobClient: Job complete: job_201601242121_0008
16/02/13 19:44:05 INFO mapred.JobClient: Counters: 19
16/02/13 19:44:05 INFO mapred.JobClient:   Job Counters 
16/02/13 19:44:05 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=18645
16/02/13 19:44:05 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
16/02/13 19:44:05 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
16/02/13 19:44:05 INFO mapred.JobClient:     Launched map tasks=1
16/02/13 19:44:05 INFO mapred.JobClient:     Data-local map tasks=1
16/02/13 19:44:05 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0
16/02/13 19:44:05 INFO mapred.JobClient:   File Output Format Counters 
16/02/13 19:44:05 INFO mapred.JobClient:     Bytes Written=31
16/02/13 19:44:05 INFO mapred.JobClient:   FileSystemCounters
16/02/13 19:44:05 INFO mapred.JobClient:     HDFS_BYTES_READ=120
16/02/13 19:44:05 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=55503
16/02/13 19:44:05 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=31
16/02/13 19:44:05 INFO mapred.JobClient:   File Input Format Counters 
16/02/13 19:44:05 INFO mapred.JobClient:     Bytes Read=19
16/02/13 19:44:05 INFO mapred.JobClient:   Map-Reduce Framework
16/02/13 19:44:05 INFO mapred.JobClient:     Map input records=1
16/02/13 19:44:05 INFO mapred.JobClient:     Physical memory (bytes) snapshot=93282304
16/02/13 19:44:05 INFO mapred.JobClient:     Spilled Records=0
16/02/13 19:44:05 INFO mapred.JobClient:     CPU time spent (ms)=2870
16/02/13 19:44:05 INFO mapred.JobClient:     Total committed heap usage (bytes)=58195968
16/02/13 19:44:05 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=682741760
16/02/13 19:44:05 INFO mapred.JobClient:     Map output records=6
16/02/13 19:44:05 INFO mapred.JobClient:     SPLIT_RAW_BYTES=101
andrea@andrea-virtual-machine:~/workspace/HadoopExperiment/bin$ hadoop fs -cat  out7/p*to   1
be  1
or  1
not 1
to  1
be  1

所以正如你所看到的,合并器提供的局部缩减似乎不起作用。
为什么?我错过了什么?我怎样才能解决这个问题?
tnx公司

x0fgdtte

x0fgdtte1#

不要假设合路器将运行。仅将合并器视为优化。合并器不能保证运行您的所有数据。在某些情况下,当数据不需要溢出到磁盘时,mapreduce将完全跳过使用合并器。还要注意,组合器可以在数据的子集上运行多次!每次泄漏一次。
因此,当还原数设置为0时,实际上并不意味着它应该给出正确的结果,因为合并器没有覆盖所有Map器数据。

相关问题