HDFS 如何解决运行MapReduce应用程序时出现的此错误?

dsekswqp  于 2022-12-09  发布在  HDFS
关注(0)|答案(1)|浏览(192)

我是MapReduce的新手,目前正在努力进行MapReduce练习。我得到了一个非常大的数据集,上面是1963年1月到1999年12月之间授予的美国专利,看起来像这样:

"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"
3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,
3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,
3695459,1972,4659,1970,"BE","",225495,3,,414,5,51,,2,,0.5,,16.5,,,,0,0
3695460,1972,4659,1970,"IT","",,1,,414,5,51,,1,,0,,28,,,,,
3866063,1975,5520,1973,"US","CA",188385,2,5,327,4,41,7,11,1,0.7438,0.4082,10.8182,5,0,0,0,0
3866064,1975,5520,1973,"US","FL",242085,2,10,327,4,41,5,11,1,0.314,0,10.9091,3.6,0,0,0,0
4548215,1985,9426,1983,"DE","",243670,3,25,131,6,61,2,7,1,0.2449,0,5.8571,19.5,0,0,0,0
4548216,1985,9426,1982,"GB","",382615,3,7,131,6,61,10,3,0.6,0,0.2778,5,20,0.5,0.2,0,0
...

以下是一些属性的含义:

我需要实现一个MapReduce Java类(其中包含一个Partitioner),以计算自1975年以来按国家分组的每项专利的平均索赔数量。Partitioner将结果分为两组:Au的结果需要在第一组,其他国家的结果在第二组。然后我需要使用Hadoop命令显示AU的结果。
这是我的代码:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class AvgClaimNumbersPerPatentByCountry extends Configured implements Tool {
    
    public int run(String[] args) throws Exception {
        Configuration conf = this.getConf();
        Job job = Job.getInstance(conf, "Avg Claims/Patent by Country since 1975"); 
        job.setJarByClass(AvgClaimNumbersPerPatentByCountry.class); 
        job.setMapperClass(MyMapper.class); //the Mapper class 
        job.setReducerClass(MyReducer.class); //the Reducer class 
        job.setOutputKeyClass(Text.class); 
        job.setOutputValueClass(IntWritable.class);
        job.setOutputValueClass(FloatWritable.class);
        job.setCombinerClass(MyReducer.class); //the Combiner class
        job.setPartitionerClass(MyPartitioner.class); //the Partitioner class
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new AvgClaimNumbersPerPatentByCountry(), args);
        System.exit(res);
    }

    public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
        private Text countryAsKey = new Text();
        private IntWritable claimsAsValue = new IntWritable(1);
        
        //map() function vvv
        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            if (line.contains("PATENT")) {
                return; //skip the first line
            }
            else {
                String[] patentData = line.split(",");
                countryAsKey.set(patentData[4]);
                if (Integer.parseInt(patentData[1]) >= 1975) {
                    claimsAsValue.set(Integer.parseInt(patentData[8]));
                }
            }
            context.write(countryAsKey, claimsAsValue);     
        }
        //map() function ^^^
    }
    
    public static class MyPartitioner extends Partitioner<Text, IntWritable> {
        //getPartition() function vvv
        public int getPartition(Text key, IntWritable value, int numReduceTasks) {
            String country = key.toString();
            if (country.toLowerCase().matches("AU")) {
                return 0;
            }
            else {
                return 1;
            }
        }
        //getPartition() function ^^^
        
    }

    public static class MyReducer extends Reducer<Text, IntWritable, Text, FloatWritable> {
        //reduce() function vvv
        @Override
        public void reduce(Text countryKey, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int totalClaimsOfCountry = 0;
            int totalPatentsOfCountry = 0;
            FloatWritable avgClaim = new FloatWritable();

            for (IntWritable value : values) {
                totalClaimsOfCountry += value.get();
                totalPatentsOfCountry += 1;
            }
            avgClaim.set(calculateAvgClaimPerPatent(totalClaimsOfCountry, totalPatentsOfCountry));

            context.write(countryKey, avgClaim);
        }
        //reduce() function ^^^

        public float calculateAvgClaimPerPatent(int totalClaims, int totalPatents) {
            float avg = (float)totalClaims / totalPatents;
            return avg;
        }
    }

}

当我在Hadoop中运行MapReduce应用程序时,出现以下错误:

21/08/19 01:18:50 INFO mapreduce.Job: Task Id : attempt_1629275188478_0001_m_000001_0, Status : FAILED Error: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.FloatWritable, received org.apache.hadoop.io.IntWritable

我知道代码中有不匹配的值(我传递了FloatWritable,而系统要求的是IntWritable),但我不知道如何修复这个问题。我真的很感激任何想法和帮助。如果我的代码有什么问题,也请指出,以便我可以在以后的练习中改进:)
提前感谢!

tez616oj

tez616oj1#

我想如果你把这条线去掉就行了

job.setCombinerClass(MyReducer.class);

你使用reducer作为组合器,但是你的reducer读取IntWritable并写入FloatWritable,然后会发生以下情况:
1.Map器写入IntWritable。
1.组合器读取IntWritable并写入FloatWritable。

  1. Reducer尝试读取IntWritable,但采用FloatWritable并引发异常。

相关问题