我是mapreduce的新手,我对mapper类和reducer类设计的这段代码有些怀疑
我熟悉map side加入mapreduce的过程,我学到了以下几点:
public static class CustsMapper extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
在这里,在上面的代码片段中,我了解到我们将类扩展到 Mapper
类和作为 Object
是一把钥匙 Text
是一个值,所以map方法使用 context
对象在这里作为一个输出 context.write(new Text(), new Text())
根据代码的逻辑体设计。
我的两个问题是:
为什么我们要把班级扩大到 MapReduceBase
(它做什么?)以及为什么我们要实现我们的类 Mapper
(我知道它是一个类,但在web上的某个地方它显示为一个接口,所以如果我将它扩展到 org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
班上有什么问题?
在 map
功能什么 OutputCollector<Text, IntWritable> output, Reporter reporter
?? 我不知道?我就知道 Context context
应该在这里,但什么是 OutputCollector
以及 Reporter
在这里?
我在做下面的程序:
输入:
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45
代码:
package hadoop;
import java.util.*;
import java.io.IOException;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class ProcessUnits
{
//Mapper class
public static class E_EMapper extends MapReduceBase implements Mapper<LongWritable ,Text,Text,IntWritable>
{
//Map function
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
String line = value.toString();
String lasttoken = null;
StringTokenizer s = new StringTokenizer(line,"\t");
String year = s.nextToken();
while(s.hasMoreTokens())
{
lasttoken=s.nextToken();
}
int avgprice = Integer.parseInt(lasttoken);
output.collect(new Text(year), new IntWritable(avgprice));
}
}
//Reducer class
public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable>
{
//Reduce function
public void reduce( Text key, Iterator <IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
{
int maxavg=30;
int val=Integer.MIN_VALUE;
while (values.hasNext())
{
if((val=values.next().get())>maxavg)
{
output.collect(key, new IntWritable(val));
}
}
}
}
//Main function
public static void main(String args[])throws Exception
{
JobConf conf = new JobConf(ProcessUnits.class);
conf.setJobName("max_eletricityunits");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(E_EMapper.class);
conf.setCombinerClass(E_EReduce.class);
conf.setReducerClass(E_EReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
输出:
1981 34
1984 40
1985 45
1条答案
按热度按时间33qvvth11#
为什么我们将类扩展到mapreducebase(它做什么?),为什么我们将类实现为mapper
因为这是Hadoop2.x出现之前用MapredAPI编写的旧代码。
我知道上下文应该在这里,但是outputcollector和reporter在这里是什么
它是上下文对象的早期版本。
hadoop:outputcollector在mapreduce期间是如何工作的?
outputcollector如何工作?