我正在尝试用java运行一个map reducer,以获取一个逗号分隔的文件,该文件包含一家坠毁的航空公司的数据。
数据包含以下列,还提供了一个示例数据:
passengerid,survived(s=0,d=1),pclass,name,sex,age,sibsp,parch,ticket,fare,cabin,embarked
1,0,3,"Braund Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S,2,1,1,"Cumings Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C,
3,1,3,"Heikkinen Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S,
4,1,1,"Futrelle Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S,
5,0,3,"Allen Mr. William Henry",male,35,0,0,373450,8.05,,S,
6,0,3,"Moran Mr. James",male,,0,0,330877,8.4583,,Q,
7,0,1,"McCarthy Mr. Timothy J",male,54,0,0,17463,51.8625,E46,S,
8,0,3,"Palsson Master. Gosta Leonard",male,2,3,1,349909,21.075,,S,
9,1,3,"Johnson Mrs. Oscar W (Elisabeth Vilhelmina Berg)",female,27,0,2,347742,11.1333,,S,
10,1,2,"Nasser Mrs. Nicholas (Adele Achem)",female,14,1,0,237736,30.0708,,C,
11,1,3,"Sandstrom Miss. Marguerite Rut",female,4,1,1,PP 9549,16.7,G6,S,
12,1,1,"Bonnell Miss. Elizabeth",female,58,0,0,113783,26.55,C103,S,
...
我的目标是找出这次车祸中死亡人数的平均年龄。以下是我的代码片段和遇到的错误:
airline.airlinedriver.java文件:
package Airline;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import Airline.airlineMapper;
import Airline.airlineReducer;
public class airlineDriver {
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException, URISyntaxException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Job j = Job.getInstance(conf);// getConf()
j.setJobName("Airline Job");
j.setJarByClass(airlineDriver.class );
j.setMapperClass(airlineMapper.class );
j.setNumReduceTasks(2);
j.setReducerClass(airlineReducer.class);
j.setMapOutputKeyClass(IntWritable.class);
j.setMapOutputValueClass(Text.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(FloatWritable.class);
FileInputFormat.addInputPath(j, new Path(args[0]));
FileOutputFormat.setOutputPath(j, new Path(args[1]));
System.exit(j.waitForCompletion(true) ? 0 : 1);
}
}
airline.airlinemapper.java文件:
package Airline;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class airlineMapper extends Mapper<LongWritable, Text,
IntWritable,Text> {
@Override
protected void map(LongWritable key, Text
value,org.apache.hadoop.mapreduce.Mapper.Context context)throws IOException,
InterruptedException {
String inputstring = value.toString(); //converts input Text value to String
IntWritable resKEY = new IntWritable();
Text resVALUE = new Text();
String str[] = inputstring.split(","); //splits it into array
int bool = Integer.parseInt(str[1]); //fetch survived(s=0) or dead(d=1)
if (bool == 1){
resVALUE.set(str[5]);
resKEY.set(bool);
context.write(resKEY,resVALUE); //write key value pair to partitioner and reducer
} }
}
airline.airlinereducer.java文件:
package Airline;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class airlineReducer extends Reducer<IntWritable,Text, Text,
FloatWritable> {
@Override
public void reduce(IntWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
float y = 0; //numerator for avgage
float avgage =0;
int counter = 0; //denominator for avgage
String a = "Average age";
for(Text x : values) //passes value to x one-by-one
{
String z = x.toString(); //converts text to string
if((!z.equals(""))&&(!z.equals(null))){ //eliminating any empty string (**possible source of error)
y +=Float.parseFloat(z); //converting age to float since csf contains floating point ages
counter++; //incrementing counter for total no. of people(records)
}else{continue; //if a null or empty string is encountered loop may skip the iteration and continue to next record
//in case few fields are left empty
}//**possible source of error
}
avgage = y/counter; //average formula
context.write(new Text(a), new FloatWritable(avgage) ); //wirting output data
}
}
即使反复修改代码,我仍然会遇到如下错误:
Error: java.lang.ArrayIndexOutOfBoundsException: 1
at Airline.airlineMapper.map(airlineMapper.java:18)
at Airline.airlineMapper.map(airlineMapper.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs
(UserGroupInformation.java:1693)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
plz帮助
1条答案
按热度按时间ygya80vv1#
正如@rameshmaharjan所回答的,正确的Map器和还原器类应该是:
airline.airlinemapper(我前面提到的代码下面也有注解。这些支票
str.length==12
以及str[5].matches("\\d+")
可以与前面的代码一起使用,并且必须正常工作):airline.airlinereducer(在前面的map reduce代码中需要注意的重要一点是,我尝试在reducer阶段检查字符串,这应该在mapper阶段本身完成。正如@rameshmaharjan之前所指出的):