Map中的键类型不匹配:应为org.apache.hadoop.io.text,收到org.apache.hadoop.io.intwritable

o8x7eapl  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(467)

我正在尝试用java运行一个map reducer,以获取一个逗号分隔的文件,该文件包含一家坠毁的航空公司的数据。
数据包含以下列,还提供了一个示例数据:

passengerid,survived(s=0,d=1),pclass,name,sex,age,sibsp,parch,ticket,fare,cabin,embarked
1,0,3,"Braund Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S,2,1,1,"Cumings Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C,
3,1,3,"Heikkinen Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S,
4,1,1,"Futrelle Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S,
5,0,3,"Allen Mr. William Henry",male,35,0,0,373450,8.05,,S,
6,0,3,"Moran Mr. James",male,,0,0,330877,8.4583,,Q,
7,0,1,"McCarthy Mr. Timothy J",male,54,0,0,17463,51.8625,E46,S,
8,0,3,"Palsson Master. Gosta Leonard",male,2,3,1,349909,21.075,,S,
9,1,3,"Johnson Mrs. Oscar W (Elisabeth Vilhelmina Berg)",female,27,0,2,347742,11.1333,,S,
10,1,2,"Nasser Mrs. Nicholas (Adele Achem)",female,14,1,0,237736,30.0708,,C,
11,1,3,"Sandstrom Miss. Marguerite Rut",female,4,1,1,PP 9549,16.7,G6,S,
12,1,1,"Bonnell Miss. Elizabeth",female,58,0,0,113783,26.55,C103,S,  
...

我的目标是找出这次车祸中死亡人数的平均年龄。以下是我的代码片段和遇到的错误:
airline.airlinedriver.java文件:

package Airline;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import Airline.airlineMapper;
import Airline.airlineReducer;

public class airlineDriver {

public static void main(String[] args) throws IOException, 
ClassNotFoundException, InterruptedException, URISyntaxException {
    // TODO Auto-generated method stub
    Configuration conf = new Configuration();

    Job j = Job.getInstance(conf);// getConf()
    j.setJobName("Airline Job");
    j.setJarByClass(airlineDriver.class );
    j.setMapperClass(airlineMapper.class );
    j.setNumReduceTasks(2);
    j.setReducerClass(airlineReducer.class);
    j.setMapOutputKeyClass(IntWritable.class);
    j.setMapOutputValueClass(Text.class);
    j.setOutputKeyClass(Text.class);
    j.setOutputValueClass(FloatWritable.class);

    FileInputFormat.addInputPath(j, new Path(args[0]));
    FileOutputFormat.setOutputPath(j, new Path(args[1]));

    System.exit(j.waitForCompletion(true) ? 0 : 1);
   }

  }

airline.airlinemapper.java文件:

package Airline;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class airlineMapper extends Mapper<LongWritable,  Text, 
IntWritable,Text> {
@Override
protected void map(LongWritable key, Text 
value,org.apache.hadoop.mapreduce.Mapper.Context context)throws IOException, 
InterruptedException {
    String inputstring = value.toString(); //converts input Text value to String
    IntWritable resKEY = new IntWritable();
    Text resVALUE = new Text();
    String str[] = inputstring.split(","); //splits it into array
    int bool = Integer.parseInt(str[1]); //fetch survived(s=0) or dead(d=1)
    if (bool == 1){
        resVALUE.set(str[5]);
        resKEY.set(bool);
        context.write(resKEY,resVALUE); //write key value pair to partitioner and reducer
}  }
}

airline.airlinereducer.java文件:

package Airline;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class airlineReducer extends Reducer<IntWritable,Text, Text, 
FloatWritable> {

@Override
public void reduce(IntWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException { 
    float y = 0;  //numerator for avgage
    float avgage =0;  
    int counter = 0;  //denominator for avgage 
    String a = "Average age";
    for(Text x : values)  //passes value to x one-by-one
    {
        String z = x.toString();   //converts text to string
        if((!z.equals(""))&&(!z.equals(null))){  //eliminating any empty string (**possible source of error)
        y +=Float.parseFloat(z);  //converting age to float since csf contains floating point ages
        counter++; //incrementing counter for total no. of people(records)

    }else{continue;  //if a null or empty string is encountered loop may skip the iteration and continue to next record
    //in case few fields are left empty 
    }//**possible source of error
        }
    avgage = y/counter;    //average formula
    context.write(new Text(a), new FloatWritable(avgage) ); //wirting output data

    }
    }

即使反复修改代码,我仍然会遇到如下错误:

Error: java.lang.ArrayIndexOutOfBoundsException: 1
at Airline.airlineMapper.map(airlineMapper.java:18)
at Airline.airlineMapper.map(airlineMapper.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs
(UserGroupInformation.java:1693)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

plz帮助

ygya80vv

ygya80vv1#

正如@rameshmaharjan所回答的,正确的Map器和还原器类应该是:
airline.airlinemapper(我前面提到的代码下面也有注解。这些支票 str.length==12 以及 str[5].matches("\\d+") 可以与前面的代码一起使用,并且必须正常工作):

package Airline;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class airlineMapper extends   
Mapper<LongWritable,Text,Text,IntWritable> {
Text gender = new Text();
IntWritable age = new IntWritable();

protected void map(LongWritable key,Text value,Context context)throws 
IOException, InterruptedException {
    String inputstring = value.toString();
       String str[]=inputstring.split(",");
             if(str.length==12){
                 gender.set(str[4]);
             if(str[1].equals("0") ){
               if(str[5].matches("\\d+")){
                  int i=Integer.parseInt(str[5]);
                    age.set(i);
        }
       }
     }
        context.write(gender, age);

     //String inputstring = value.toString();
     // String[] str = inputstring.split(",");
     // IntWritable resKEY = new IntWritable();
     // Text resVALUE = new Text();
     // 
     // int bool = Integer.parseInt(str[1]);
     // if (bool == 1 && str[5].length() >= 1){
     //     resVALUE.set(str[5]);
     //     resKEY.set(bool);
     // context.write(resKEY,resVALUE);
     // }
      }}

airline.airlinereducer(在前面的map reduce代码中需要注意的重要一点是,我尝试在reducer阶段检查字符串,这应该在mapper阶段本身完成。正如@rameshmaharjan之前所指出的):

package Airline;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class airlineReducer extends Reducer<Text,IntWritable, Text, 
IntWritable> {

@Override
public void reduce( Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException { 
    int sum = 0;
     int total_people=0;
     for (IntWritable val : values) {
     total_people+=1;
     sum += val.get();
     }
     sum=sum/total_people;
     context.write(key, new IntWritable(sum));

    //float y = 0;
    //float avgage =0;
    //int counter = 0;
    //String a = "Average age";
    //Text resKEY = new Text();
    //FloatWritable resVALUE = new FloatWritable();
    //  for(Text x : values)
    //  {
    //      String z = x.toString();
    //      
    //      y +=Float.parseFloat(z);
    //      counter++;
    //  
    //  
    //  }   
    //  avgage = y/counter;
    //  resKEY.set(a);
    //  resVALUE.set(avgage);
    //  context.write(resKEY, resVALUE );

}
}

相关问题