我是MapReduce的新手,目前正在努力进行MapReduce练习。我得到了一个非常大的数据集,上面是1963年1月到1999年12月之间授予的美国专利,看起来像这样:
"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"
3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,
3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,
3695459,1972,4659,1970,"BE","",225495,3,,414,5,51,,2,,0.5,,16.5,,,,0,0
3695460,1972,4659,1970,"IT","",,1,,414,5,51,,1,,0,,28,,,,,
3866063,1975,5520,1973,"US","CA",188385,2,5,327,4,41,7,11,1,0.7438,0.4082,10.8182,5,0,0,0,0
3866064,1975,5520,1973,"US","FL",242085,2,10,327,4,41,5,11,1,0.314,0,10.9091,3.6,0,0,0,0
4548215,1985,9426,1983,"DE","",243670,3,25,131,6,61,2,7,1,0.2449,0,5.8571,19.5,0,0,0,0
4548216,1985,9426,1982,"GB","",382615,3,7,131,6,61,10,3,0.6,0,0.2778,5,20,0.5,0.2,0,0
...
以下是一些属性的含义:
我需要实现一个MapReduce Java类(其中包含一个Partitioner),以计算自1975年以来按国家分组的每项专利的平均索赔数量。Partitioner将结果分为两组:Au的结果需要在第一组,其他国家的结果在第二组。然后我需要使用Hadoop命令显示AU的结果。
这是我的代码:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class AvgClaimNumbersPerPatentByCountry extends Configured implements Tool {
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
Job job = Job.getInstance(conf, "Avg Claims/Patent by Country since 1975");
job.setJarByClass(AvgClaimNumbersPerPatentByCountry.class);
job.setMapperClass(MyMapper.class); //the Mapper class
job.setReducerClass(MyReducer.class); //the Reducer class
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputValueClass(FloatWritable.class);
job.setCombinerClass(MyReducer.class); //the Combiner class
job.setPartitionerClass(MyPartitioner.class); //the Partitioner class
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new AvgClaimNumbersPerPatentByCountry(), args);
System.exit(res);
}
public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
private Text countryAsKey = new Text();
private IntWritable claimsAsValue = new IntWritable(1);
//map() function vvv
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
if (line.contains("PATENT")) {
return; //skip the first line
}
else {
String[] patentData = line.split(",");
countryAsKey.set(patentData[4]);
if (Integer.parseInt(patentData[1]) >= 1975) {
claimsAsValue.set(Integer.parseInt(patentData[8]));
}
}
context.write(countryAsKey, claimsAsValue);
}
//map() function ^^^
}
public static class MyPartitioner extends Partitioner<Text, IntWritable> {
//getPartition() function vvv
public int getPartition(Text key, IntWritable value, int numReduceTasks) {
String country = key.toString();
if (country.toLowerCase().matches("AU")) {
return 0;
}
else {
return 1;
}
}
//getPartition() function ^^^
}
public static class MyReducer extends Reducer<Text, IntWritable, Text, FloatWritable> {
//reduce() function vvv
@Override
public void reduce(Text countryKey, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int totalClaimsOfCountry = 0;
int totalPatentsOfCountry = 0;
FloatWritable avgClaim = new FloatWritable();
for (IntWritable value : values) {
totalClaimsOfCountry += value.get();
totalPatentsOfCountry += 1;
}
avgClaim.set(calculateAvgClaimPerPatent(totalClaimsOfCountry, totalPatentsOfCountry));
context.write(countryKey, avgClaim);
}
//reduce() function ^^^
public float calculateAvgClaimPerPatent(int totalClaims, int totalPatents) {
float avg = (float)totalClaims / totalPatents;
return avg;
}
}
}
当我在Hadoop中运行MapReduce应用程序时,出现以下错误:
21/08/19 01:18:50 INFO mapreduce.Job: Task Id : attempt_1629275188478_0001_m_000001_0, Status : FAILED Error: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.FloatWritable, received org.apache.hadoop.io.IntWritable
我知道代码中有不匹配的值(我传递了FloatWritable,而系统要求的是IntWritable),但我不知道如何修复这个问题。我真的很感激任何想法和帮助。如果我的代码有什么问题,也请指出,以便我可以在以后的练习中改进:)
提前感谢!
1条答案
按热度按时间tez616oj1#
我想如果你把这条线去掉就行了
你使用reducer作为组合器,但是你的reducer读取IntWritable并写入FloatWritable,然后会发生以下情况:
1.Map器写入IntWritable。
1.组合器读取IntWritable并写入FloatWritable。