我对hadoop还很陌生,在一些基本的map reduce程序上做过实验。我遇到了一些奇怪的问题。我注意到我的Map器输出通过绕过reducer直接打印到输出文件。
代码如下
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
class WeatherRecord implements Writable {
public DoubleWritable maxSum; // running sum of TMAX records
public IntWritable maxCount; // running count of TMAX records
public DoubleWritable minSum; // running sum of TMIN records
public IntWritable minCount; // running count of TMIN records
// default constructor
public WeatherRecord(){
maxSum = new DoubleWritable();
maxCount= new IntWritable();
minSum = new DoubleWritable();
minCount = new IntWritable();
}
// custom constructor
public WeatherRecord(DoubleWritable ms, IntWritable mc, DoubleWritable ms1, IntWritable mc1){
maxSum = ms;
maxCount= mc;
minSum = ms1;
minCount = mc1;
}
/* Getter and setter Methods*/
//method to get running total of temperature
public double getMaxSum(){
return Double.parseDouble(maxSum.toString());
}
//method to get running total of temperature
public double getMinSum(){
return Double.parseDouble(minSum.toString());
}
//method to get Count
public int getMaxCount(){ return Integer.parseInt(maxCount.toString());}
//method to get Count
public int getMinCount(){ return Integer.parseInt(minCount.toString());}
// method to set count
public void setMaxCount(int c){
maxCount = new IntWritable(c);
}
// method to set count
public void setMinCount(int c){
minCount = new IntWritable(c);
}
//method to set reading sum
public void setMaxSum(double r){
maxSum = new DoubleWritable(r);
}
//method to set reading sum
public void setMinSum(double r){
minSum = new DoubleWritable(r);
}
// method to serialize object
public void write(DataOutput dataOutput) throws IOException {
maxSum.write(dataOutput);
maxCount.write(dataOutput);
minSum.write(dataOutput);
minCount.write(dataOutput);
}
//method to deserialize object
public void readFields(DataInput dataInput) throws IOException {
maxSum.readFields(dataInput);
maxCount.readFields(dataInput);
minSum.readFields(dataInput);
minCount.readFields(dataInput);
}
}
public class WeatherDriver extends Configured implements Tool{
public static class WeatherMap extends Mapper<LongWritable, Text, Text,WeatherRecord > {
HashMap<String,WeatherRecord> recordMap= new HashMap<String,WeatherRecord>();
protected void map(LongWritable key, Text value, Mapper.Context context) {
//the individual records from csv file is split based on ','
String[] record = value.toString().split(",");
//station-id is the first field in the file
String stationId = record[0];
//record-type(TMAX,TMIN,..) is the third field in the csv file
String type = record[2];
//temperature readings are fourth column in the csv file
double temperature = Double.parseDouble(record[3]);
if(type.equalsIgnoreCase("TMAX") || type.equalsIgnoreCase("TMIN")){
if(recordMap.containsKey(stationId)){
WeatherRecord w = recordMap.get(stationId);
if(type.equalsIgnoreCase("TMAX")){
w.setMaxCount(1 + w.getMaxCount());
w.setMaxSum(w.getMaxSum() + temperature);
}
else if(type.equalsIgnoreCase("TMIN")){
w.setMinCount(1+w.getMinCount());
w.setMinSum(w.getMinSum() + temperature);
}
recordMap.put(stationId,w);
}
else{
if(type.equalsIgnoreCase("TMAX")){
recordMap.put(stationId, new WeatherRecord(new DoubleWritable(temperature), new IntWritable(1),
new DoubleWritable(0), new IntWritable(0)));
}
else if(type.equalsIgnoreCase("TMIN")){
recordMap.put(stationId, new WeatherRecord(new DoubleWritable(0), new IntWritable(0),
new DoubleWritable(temperature), new IntWritable(1)));
}
}
}
} // end of map method
protected void cleanup(Context context) throws IOException, InterruptedException {
Iterator i = recordMap.keySet().iterator();
String stationId="";
while(i.hasNext()){
stationId = i.next().toString();
context.write(new Text(stationId),recordMap.get(stationId));
}
} // end of cleanup
} // end of mapper class
public static class WeatherReduce extends Reducer<Text, WeatherRecord, Text, Text> {
protected void reduce(Text key, Iterator<WeatherRecord> values, Reducer<Text, WeatherRecord, Text, Text>.Context context) throws IOException, InterruptedException {
// initializing local variables to compute average
int maxCount =0;
int minCount=0;
double maxSum=0;
double minSum=0;
//iterating over list of values to compute average
while(values.hasNext()){
WeatherRecord record = values.next();
maxSum += Double.parseDouble(record.maxSum.toString());
maxCount += Integer.parseInt(record.maxCount.toString());
minSum += Double.parseDouble(record.minSum.toString());
minCount+=Integer.parseInt(record.minCount.toString());
}
// logic to handle divide by zero case
if(minCount==0){
minCount=1;
}
if(maxCount==0){
maxCount=1;
}
System.out.println("Min Sum is" + minSum);
context.write(new Text(key), new Text(","+(minSum/minCount)+","+(maxSum/maxCount)));
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
args = new GenericOptionsParser(conf, args).getRemainingArgs();
String input = args[0];
String output = args[1];
Job job = new Job(conf, "weather average");
job.setJarByClass(WeatherMap.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(WeatherMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(WeatherRecord.class);
job.setReducerClass(WeatherReduce.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(input));
Path outPath = new Path(output);
FileOutputFormat.setOutputPath(job, outPath);
outPath.getFileSystem(conf).delete(outPath, true);
job.waitForCompletion(true);
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new WeatherDriver(), args);
System.exit(exitCode);
}
}
预期输出将是station\u id、average\u min\u temp、average\u max\u temp
AGE00135039,123.12,11
但是,我得到的却是这个输出。通过分析代码,我发现Map器中的context.write直接写入输出文件
AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00135039 WeatherRecord@7df8df85
AGE00147704 WeatherRecord@7df8df85
AGE00147704 WeatherRecord@7df8df85
AGE00147704 WeatherRecord@7df8df85
AGE00147704 WeatherRecord@7df8df85
AGE00147704 WeatherRecord@7df8df85
AGE00147704 WeatherRecord@7df8df85
1条答案
按热度按时间tp5buhyn1#
你的工作可能是打电话给
reduce()
方法,因为reduce()
方法不正确。您有:protected void reduce(Text key, Iterator<WeatherRecord> values, Context context)
它应该看起来像:protected void reduce(Text key, Iterable<WeatherRecord> values, Context context)
注意从Iterator
至Iterable
.避免这种情况的一种方法是添加
@Override
您认为应该重写基本实现的方法的注解,如果您不这样做,它将抛出编译时错误。