如何将json文件读入avro mapreduce?

2ul0zpep  于 2021-07-13  发布在  Hadoop
关注(0)|答案(1)|浏览(505)

我正在尝试使用avro为hadoop编写一个mapreduce作业。目标是用avro加载json文件,然后在其上运行一些reducer。
因为我想在mapreduce中减少它,而且它应该高效地运行,所以我不想在运行作业之前将磁盘上的json文件转换为avro文件。
不幸的是,我得到以下错误: Error: java.lang.ClassCastException: class org.apache.hadoop.io.Text cannot be cast to class org.apache.avro.mapred.AvroKey (org.apache.hadoop.io.Text and org.apache.avro.mapred.AvroKey are in unnamed module of loader 'app' 字数.java

package mz;

import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyValueOutputFormat;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.StringTokenizer;

public class WordCount extends Configured implements Tool {

    public static class TokenizerMapper extends
            Mapper<AvroKey<Review>, Text, Text, IntWritable> {

        private final static IntWritable ONE = new IntWritable(1);

        @Override
        public void map(AvroKey<Review> key, Text value, Context context)
                throws IOException, InterruptedException {

            String category = key.datum().getCategory().toString();
            String reviewText = key.datum().getReviewText().toString();

            StringTokenizer itr = new StringTokenizer(reviewText);
            while (itr.hasMoreTokens()) {
                context.write(new Text(category + ":" + itr.nextToken()), ONE);
            }

        }
    }

    public static class CountReducer extends
            Reducer<Text, IntWritable, AvroKey<CharSequence>, AvroValue<Integer>> {

        @Override
        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context) throws IOException, InterruptedException {

            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(new AvroKey<>(key.toString()), new AvroValue<>(sum));
        }
    }

    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: mz.MapReduceColorCount <input path> <output path>");
            return -1;
        }

        Job job = new Job(getConf());
        job.setJarByClass(WordCount.class);
        job.setJobName("Color Count");

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setMapperClass(TokenizerMapper.class);
        AvroJob.setInputKeySchema(job, Review.getClassSchema());
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
        job.setReducerClass(CountReducer.class);
        AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING));
        AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.INT));

        return (job.waitForCompletion(true) ? 0 : 1);
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new WordCount(), args);
        System.exit(res);
    }
}

评论.avsc

{
  "type" : "record",
  "name" : "Review",
  "namespace" : "mz",
  "fields" : [ {
    "name" : "reviewerID",
    "type" : "string"
  }, {
    "name" : "asin",
    "type" : "string"
  }, {
    "name" : "reviewerName",
    "type" : "string"
  }, {
    "name" : "helpful",
    "type" : {
      "type" : "array",
      "items" : "long"
    }
  }, {
    "name" : "reviewText",
    "type" : "string"
  }, {
    "name" : "overall",
    "type" : "double"
  }, {
    "name" : "summary",
    "type" : "string"
  }, {
    "name" : "unixReviewTime",
    "type" : "long"
  }, {
    "name" : "reviewTime",
    "type" : "string"
  }, {
    "name" : "category",
    "type" : "string"
  } ]
}
7qhs6swi

7qhs6swi1#

keyvaluetextinputformat要求Map输入为(文本,文本)元组,默认情况下,这些元组在文件中按制表符拆分。
因此,尝试用avrokey类读取输入是不正确的,如果需要的话,需要使用不同的inputformat。否则,您会说您正在读取json文件,所以您的mapper输入将不是avro,而是文本
此外,我强烈建议使用spark来读/写json和avro,因为mapreduce没有强大的json支持

相关问题