hadoop中的stop reduce函数

sr4lhrrt  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(334)

我有一个reduce函数,我想在处理一些n键之后停止reduce函数。我已经在每个键上设置了一个递增计数器,并且在满足条件的情况下从reduce函数返回。
这是密码

public class wordcount {

public static class Map extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
        private IntWritable leng=new IntWritable();

 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();

        StringTokenizer tokenizer = new StringTokenizer(line);
         while (tokenizer.hasMoreTokens()) {
                String lword=tokenizer.nextToken(); 
            leng.set(lword.length());
             context.write(leng, one);
             }
        }
    } 

public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

    int count=0;
        public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) 
         throws IOException, InterruptedException {
         int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
        count++;
         }
        context.write(key, new IntWritable(sum));
        if(count>19) return;
            }

    }

有没有其他方法可以让我做到这一点。

pkln4tw6

pkln4tw61#

可以通过重写 run() 减速器类(新api)

public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

  //reduce method here

  // Override the run()
  @override
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    int count = 0;
    while (context.nextKey()) {
        if (count++ < n) {
        reduce(context.getCurrentKey(), context.getValues(), context);
        } else {
            // exit or do whatever you want
        }
    }
    cleanup(context);
  }
}

相关问题