mapreduce组合器

rryofs0p  于 2021-06-04  发布在  Hadoop
关注(0)|答案(4)|浏览(332)

我有一个简单的mapreduce代码与mapper,reducer和combiner。Map器的输出被传递到组合器。但是对于减速机,不是来自组合器的输出,而是来自Map器的输出。
请帮忙
代码:

package Combiner;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class AverageSalary
{
public static class Map extends  Mapper<LongWritable, Text, Text, DoubleWritable> 
{
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
    {    
        String[] empDetails= value.toString().split(",");
        Text unit_key = new Text(empDetails[1]);      
        DoubleWritable salary_value = new DoubleWritable(Double.parseDouble(empDetails[2]));
        context.write(unit_key,salary_value);    

    }  
}
public static class Combiner extends Reducer<Text,DoubleWritable, Text,Text> 
{
    public void reduce(final Text key, final Iterable<DoubleWritable> values, final Context context)
    {
        String val;
        double sum=0;
        int len=0;
        while (values.iterator().hasNext())
        {
            sum+=values.iterator().next().get();
            len++;
        }
        val=String.valueOf(sum)+":"+String.valueOf(len);
        try {
            context.write(key,new Text(val));
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
public static class Reduce extends Reducer<Text,Text, Text,Text> 
{
    public void reduce (final Text key, final Text values, final Context context)
    {
        //String[] sumDetails=values.toString().split(":");
        //double average;
        //average=Double.parseDouble(sumDetails[0]);
        try {
            context.write(key,values);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
public static void main(String args[])
{
    Configuration conf = new Configuration();
    try
    {
     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();    
     if (otherArgs.length != 2) {      
         System.err.println("Usage: Main <in> <out>");      
         System.exit(-1);    }    
     Job job = new Job(conf, "Average salary");    
     //job.setInputFormatClass(KeyValueTextInputFormat.class);    
     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));    
     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));    
     job.setJarByClass(AverageSalary.class);    
     job.setMapperClass(Map.class);    
     job.setCombinerClass(Combiner.class);
     job.setReducerClass(Reduce.class);    
     job.setOutputKeyClass(Text.class);    
     job.setOutputValueClass(Text.class);    

        System.exit(job.waitForCompletion(true) ? 0 : -1);
    } catch (ClassNotFoundException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

7y4bm7vi

7y4bm7vi1#

Combiner 当你跑的时候不会一直工作 mapreduce .
如果至少有三个溢出文件(Map器的输出写入本地磁盘),组合器将执行,这样可以减小文件的大小,以便可以轻松地将其传输到减少节点。
组合器需要运行的溢出数可以设置为通过 min.num.spills.for.combine 财产

pkmbmrz7

pkmbmrz72#

似乎你忘记了组合器的重要特性:
键/值的输入类型和键/值的输出类型必须相同。
你不能接受 Text/DoubleWritable 并返回一个 Text/Text . 我建议你用 Text 相反 DoubleWritable ,并在内部进行适当的解析 Combiner .

zlhcx6iw

zlhcx6iw3#

合路器的#1规则是:不要假设合路器将运行。仅将合并器视为优化。
合并器不能保证运行您的所有数据。在某些情况下,当数据不需要溢出到磁盘时,mapreduce将完全跳过使用合并器。还要注意,组合器可以在数据的子集上运行多次!每次泄漏一次。
对你来说,你是在做一个错误的假设。你应该在组合器和减速机中求和。
此外,您还应该遵循@user987339的答案。组合器的输入和输出需要相同(text,double->text,double),并且需要与Map器的输出和减速机的输入匹配。

2w3kk1z5

2w3kk1z54#

如果使用combine函数,那么它与reduce函数的形式相同(并且是reducer的一个实现),只是它的输出类型是中间键和值类型(k2和v2),所以它们可以提供reduce函数:map:(k1,v1)→ 列表(k2,v2)合并:(k2,列表(v2))→ 列表(k2,v2)减少:(k2,列表(v2))→ list(k3,v3)通常combine和reduce函数是相同的,在这种情况下,k3与k2相同,v3与v2相同。

相关问题