我正在尝试使用avro为hadoop编写一个mapreduce作业。目标是用avro加载json文件,然后在其上运行一些reducer。
因为我想在mapreduce中减少它,而且它应该高效地运行,所以我不想在运行作业之前将磁盘上的json文件转换为avro文件。
不幸的是,我得到以下错误: Error: java.lang.ClassCastException: class org.apache.hadoop.io.Text cannot be cast to class org.apache.avro.mapred.AvroKey (org.apache.hadoop.io.Text and org.apache.avro.mapred.AvroKey are in unnamed module of loader 'app'
字数.java
package mz;
import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyValueOutputFormat;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.StringTokenizer;
public class WordCount extends Configured implements Tool {
public static class TokenizerMapper extends
Mapper<AvroKey<Review>, Text, Text, IntWritable> {
private final static IntWritable ONE = new IntWritable(1);
@Override
public void map(AvroKey<Review> key, Text value, Context context)
throws IOException, InterruptedException {
String category = key.datum().getCategory().toString();
String reviewText = key.datum().getReviewText().toString();
StringTokenizer itr = new StringTokenizer(reviewText);
while (itr.hasMoreTokens()) {
context.write(new Text(category + ":" + itr.nextToken()), ONE);
}
}
}
public static class CountReducer extends
Reducer<Text, IntWritable, AvroKey<CharSequence>, AvroValue<Integer>> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(new AvroKey<>(key.toString()), new AvroValue<>(sum));
}
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: mz.MapReduceColorCount <input path> <output path>");
return -1;
}
Job job = new Job(getConf());
job.setJarByClass(WordCount.class);
job.setJobName("Color Count");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapperClass(TokenizerMapper.class);
AvroJob.setInputKeySchema(job, Review.getClassSchema());
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
job.setReducerClass(CountReducer.class);
AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING));
AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.INT));
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new WordCount(), args);
System.exit(res);
}
}
评论.avsc
{
"type" : "record",
"name" : "Review",
"namespace" : "mz",
"fields" : [ {
"name" : "reviewerID",
"type" : "string"
}, {
"name" : "asin",
"type" : "string"
}, {
"name" : "reviewerName",
"type" : "string"
}, {
"name" : "helpful",
"type" : {
"type" : "array",
"items" : "long"
}
}, {
"name" : "reviewText",
"type" : "string"
}, {
"name" : "overall",
"type" : "double"
}, {
"name" : "summary",
"type" : "string"
}, {
"name" : "unixReviewTime",
"type" : "long"
}, {
"name" : "reviewTime",
"type" : "string"
}, {
"name" : "category",
"type" : "string"
} ]
}
1条答案
按热度按时间7qhs6swi1#
keyvaluetextinputformat要求Map输入为(文本,文本)元组,默认情况下,这些元组在文件中按制表符拆分。
因此,尝试用avrokey类读取输入是不正确的,如果需要的话,需要使用不同的inputformat。否则,您会说您正在读取json文件,所以您的mapper输入将不是avro,而是文本
此外,我强烈建议使用spark来读/写json和avro,因为mapreduce没有强大的json支持