如何将json文件读入avro mapreduce?

2ul0zpep  于 2021-07-13  发布在  Hadoop
关注(0)|答案(1)|浏览(569)

我正在尝试使用avro为hadoop编写一个mapreduce作业。目标是用avro加载json文件,然后在其上运行一些reducer。
因为我想在mapreduce中减少它,而且它应该高效地运行,所以我不想在运行作业之前将磁盘上的json文件转换为avro文件。
不幸的是,我得到以下错误: Error: java.lang.ClassCastException: class org.apache.hadoop.io.Text cannot be cast to class org.apache.avro.mapred.AvroKey (org.apache.hadoop.io.Text and org.apache.avro.mapred.AvroKey are in unnamed module of loader 'app' 字数.java

  1. package mz;
  2. import org.apache.avro.Schema;
  3. import org.apache.avro.mapred.AvroKey;
  4. import org.apache.avro.mapred.AvroValue;
  5. import org.apache.avro.mapreduce.AvroJob;
  6. import org.apache.avro.mapreduce.AvroKeyValueOutputFormat;
  7. import org.apache.hadoop.conf.Configured;
  8. import org.apache.hadoop.fs.Path;
  9. import org.apache.hadoop.io.IntWritable;
  10. import org.apache.hadoop.io.LongWritable;
  11. import org.apache.hadoop.io.Text;
  12. import org.apache.hadoop.mapreduce.Job;
  13. import org.apache.hadoop.mapreduce.Mapper;
  14. import org.apache.hadoop.mapreduce.Reducer;
  15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  16. import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  18. import org.apache.hadoop.util.Tool;
  19. import org.apache.hadoop.util.ToolRunner;
  20. import java.io.IOException;
  21. import java.util.StringTokenizer;
  22. public class WordCount extends Configured implements Tool {
  23. public static class TokenizerMapper extends
  24. Mapper<AvroKey<Review>, Text, Text, IntWritable> {
  25. private final static IntWritable ONE = new IntWritable(1);
  26. @Override
  27. public void map(AvroKey<Review> key, Text value, Context context)
  28. throws IOException, InterruptedException {
  29. String category = key.datum().getCategory().toString();
  30. String reviewText = key.datum().getReviewText().toString();
  31. StringTokenizer itr = new StringTokenizer(reviewText);
  32. while (itr.hasMoreTokens()) {
  33. context.write(new Text(category + ":" + itr.nextToken()), ONE);
  34. }
  35. }
  36. }
  37. public static class CountReducer extends
  38. Reducer<Text, IntWritable, AvroKey<CharSequence>, AvroValue<Integer>> {
  39. @Override
  40. public void reduce(Text key, Iterable<IntWritable> values,
  41. Context context) throws IOException, InterruptedException {
  42. int sum = 0;
  43. for (IntWritable value : values) {
  44. sum += value.get();
  45. }
  46. context.write(new AvroKey<>(key.toString()), new AvroValue<>(sum));
  47. }
  48. }
  49. public int run(String[] args) throws Exception {
  50. if (args.length != 2) {
  51. System.err.println("Usage: mz.MapReduceColorCount <input path> <output path>");
  52. return -1;
  53. }
  54. Job job = new Job(getConf());
  55. job.setJarByClass(WordCount.class);
  56. job.setJobName("Color Count");
  57. FileInputFormat.setInputPaths(job, new Path(args[0]));
  58. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  59. job.setInputFormatClass(KeyValueTextInputFormat.class);
  60. job.setMapperClass(TokenizerMapper.class);
  61. AvroJob.setInputKeySchema(job, Review.getClassSchema());
  62. job.setMapOutputKeyClass(Text.class);
  63. job.setMapOutputValueClass(IntWritable.class);
  64. job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
  65. job.setReducerClass(CountReducer.class);
  66. AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING));
  67. AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.INT));
  68. return (job.waitForCompletion(true) ? 0 : 1);
  69. }
  70. public static void main(String[] args) throws Exception {
  71. int res = ToolRunner.run(new WordCount(), args);
  72. System.exit(res);
  73. }
  74. }

评论.avsc

  1. {
  2. "type" : "record",
  3. "name" : "Review",
  4. "namespace" : "mz",
  5. "fields" : [ {
  6. "name" : "reviewerID",
  7. "type" : "string"
  8. }, {
  9. "name" : "asin",
  10. "type" : "string"
  11. }, {
  12. "name" : "reviewerName",
  13. "type" : "string"
  14. }, {
  15. "name" : "helpful",
  16. "type" : {
  17. "type" : "array",
  18. "items" : "long"
  19. }
  20. }, {
  21. "name" : "reviewText",
  22. "type" : "string"
  23. }, {
  24. "name" : "overall",
  25. "type" : "double"
  26. }, {
  27. "name" : "summary",
  28. "type" : "string"
  29. }, {
  30. "name" : "unixReviewTime",
  31. "type" : "long"
  32. }, {
  33. "name" : "reviewTime",
  34. "type" : "string"
  35. }, {
  36. "name" : "category",
  37. "type" : "string"
  38. } ]
  39. }
7qhs6swi

7qhs6swi1#

keyvaluetextinputformat要求Map输入为(文本,文本)元组,默认情况下,这些元组在文件中按制表符拆分。
因此,尝试用avrokey类读取输入是不正确的,如果需要的话,需要使用不同的inputformat。否则,您会说您正在读取json文件,所以您的mapper输入将不是avro,而是文本
此外,我强烈建议使用spark来读/写json和avro,因为mapreduce没有强大的json支持

相关问题