hadoop mapreduce解析csv时出错

hsvhsicv  于 2021-06-03  发布在  Hadoop
关注(0)|答案(3)|浏览(270)

在解析csv文件时,map函数中出现以下错误。

14/07/15 19:40:05 INFO mapreduce.Job: Task Id : attempt_1403602091361_0018_m_000001_2, Status : FAILED
Error: java.lang.ArrayIndexOutOfBoundsException: 4
        at com.test.mapreduce.RetailCustomerAnalysis_2$MapClass.map(RetailCustomerAnalysis_2.java:55)
        at com.test.mapreduce.RetailCustomerAnalysis_2$MapClass.map(RetailCustomerAnalysis_2.java:1)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:429)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

map函数如下所示

package com.test.mapreduce;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class RetailCustomerAnalysis_2 extends Configured implements Tool {
     public static class MapClass extends MapReduceBase
     implements Mapper<Text, Text, Text, Text> {

          private Text key1 = new Text();
          private Text value1 = new Text();

     public void map(Text key, Text value,
                     OutputCollector<Text, Text> output,
                     Reporter reporter) throws IOException {

         String line = value.toString();
         String[] split = line.split(",");

         key1.set(split[0].trim()); 
         /* line no 55 where error is occuring */
         value1.set(split[4].trim()); 

         output.collect(key1, value1);
     }
 }

 public int run(String[] args) throws Exception {
     Configuration conf = getConf();

     JobConf job = new JobConf(conf, RetailCustomerAnalysis_2.class);

     Path in = new Path(args[0]);
     Path out = new Path(args[1]);
     FileInputFormat.setInputPaths(job, in);
     FileOutputFormat.setOutputPath(job, out);

     job.setJobName("RetailCustomerAnalysis_2");
     job.setMapperClass(MapClass.class);
     job.setReducerClass(Reduce.class);

     job.setInputFormat(KeyValueTextInputFormat.class);
     job.setOutputFormat(TextOutputFormat.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(Text.class);
    // job.set("key.value.separator.in.input.line", ",");

     JobClient.runJob(job);

     return 0;
 }

 public static void main(String[] args) throws Exception { 
     int res = ToolRunner.run(new Configuration(), new RetailCustomerAnalysis_2(), args);

     System.exit(res);
 }

}

用于运行此代码的示例输入如下

PRAVEEN,4002012,Kids,02GK,7/4/2010
PRAVEEN,400201,TOY,020383,14/04/2014

我使用以下命令和输入运行应用程序。

yarn jar RetailCustomerAnalysis_2.jar com.test.mapreduce.RetailCustomerAnalysis_2 /hduser/input5 /hduser/output5
dgiusagp

dgiusagp1#

split[]仅包含split[0]、split[1]、split[2]和split[3]元素

z9smfwbn

z9smfwbn2#

添加检查以查看输入行是否定义了所有字段或忽略处理它的Map函数。在新的api中,代码应该是这样的。

if(split.length!=noOfFields){
                        return;
        }

另外,如果您还感兴趣,可以设置hadoop countner来知道csv文件中总共有多少行包含所有必需的字段。

if(split.length!=noOfFields){
            context.getCounter(MTJOB.DISCARDED_ROWS_DUE_MISSING_FIELDS)
            .increment(1);
            return;
        }
ff29svar

ff29svar3#

对于keyvaluetextinputformat,分隔符前面的第一个字符串被视为键,行的其余部分被视为值。字节分隔符(coma或空格等)用于分隔每条记录中的键和值。
在您的代码中,第一个coma之前的第一个字符串作为键,行的其余部分作为值。拆分值时,其中只有4个字符串。因此,字符串数组只能从split[0]到split[3],而不是split[4]。
欢迎任何建议或更正。

相关问题