java hadoop错误的值类:类ratiocount$writablearray不是类org.apache.hadoop.io.doublewritable

mklgxw1f  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(275)

我正在努力学习hadoop。我有一个文本文件,每行包含一个交通流。信息用逗号分隔。我想让我的map函数输出一个字符串,我构建这个字符串来标识一个流,类似这样:“123.124.32.6 14.23.64.21 80 tcp”作为一个键和一些双精度值(一个数字)。我希望reduce函数将相同的字符串作为键输出,并作为值从所有相似的键中获取所有值并将它们放入数组中。所以我想要这样的东西:“123.124.32.6 14.23.64.21 80 tcp”:[0.3-0.11-10.5]作为我的最终输出。当我运行它时,我得到一个错误:
错误:java.io.ioexception:错误的值class:class ratiocount$writeablearray不是class org.apache.hadoop.io.doublewriteable
你能指出我的错误和如何改正吗?
这是我的密码:

public class RatioCount {

public static class WritableArray extends ArrayWritable {

    public WritableArray(Class<? extends Writable> valueClass, Writable[] values) {
        super(valueClass, values);
    }
    public WritableArray(Class<? extends Writable> valueClass) {
        super(valueClass);
    }

    @Override
    public DoubleWritable[] get() {
        return (DoubleWritable[]) super.get();
    }

    @Override
    public void write(DataOutput arg0) throws IOException {
        System.out.println("write method called");
        super.write(arg0);
    }
    @Override
    public String toString() {
        return Arrays.toString(get());
    }

}

public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();

    Job job = Job.getInstance(conf, "ratio count");

    job.setJarByClass(RatioCount.class);
    job.setMapperClass(MyMapper.class);
    job.setCombinerClass(MyReducer.class);
    job.setReducerClass(MyReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(DoubleWritable.class);
    job.setOutputValueClass(WritableArray.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

public static class MyReducer
        extends Reducer<Text, DoubleWritable, Text, WritableArray> {

    private final IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
            throws IOException, InterruptedException {
        ArrayList<DoubleWritable> list = new ArrayList<DoubleWritable>();

        for(DoubleWritable value :values){
            list.add(value);
        }
        context.write(key, new WritableArray(DoubleWritable.class, list.toArray(new DoubleWritable[list.size()])));
    }

}

public static class MyMapper extends Mapper<Object, Text, Text, DoubleWritable> {

    private final Text word = new Text();

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        if (value.toString().contains("StartTime")) {
            return;
        }
        DoubleWritable ratio;
StringTokenizer(value.toString(),",");
            String[] tokens = value.toString().split(",");
            StringBuilder sb = new StringBuilder();
            sb.append(tokens[2]);
            sb.append(tokens[3]);
            sb.append(tokens[6]);
            sb.append(tokens[7]);
            System.out.println(sb.toString());
            word.set(sb.toString());  
            double sappbytes = Double.parseDouble(tokens[13]);
            double totbytes = Double.parseDouble(tokens[14]);
            double dappbytes = totbytes - sappbytes;

            ratio = new DoubleWritable((sappbytes - dappbytes) / totbytes);
            context.write(word, ratio);

        }
    }
}
krcsximq

krcsximq1#

你的问题是这行: job.setCombinerClass(MyReducer.class); 组合器必须接收和发射相同的类型。在您的情况下,您有: Reducer<Text, DoubleWritable, Text, WritableArray> 将输出一个 WritableArray 但是下面的例子 DoubleWritable .
您应该删除合并器,或者重新编写它(作为一个单独的类到您的reducer中),以便它接受 Text, DoubleWriteable 发出相同类型的光。

相关问题