我正在努力学习hadoop。我有一个文本文件,每行包含一个交通流。信息用逗号分隔。我想让我的map函数输出一个字符串,我构建这个字符串来标识一个流,类似这样:“123.124.32.6 14.23.64.21 80 tcp”作为一个键和一些双精度值(一个数字)。我希望reduce函数将相同的字符串作为键输出,并作为值从所有相似的键中获取所有值并将它们放入数组中。所以我想要这样的东西:“123.124.32.6 14.23.64.21 80 tcp”:[0.3-0.11-10.5]作为我的最终输出。当我运行它时,我得到一个错误:
错误:java.io.ioexception:错误的值class:class ratiocount$writeablearray不是class org.apache.hadoop.io.doublewriteable
你能指出我的错误和如何改正吗?
这是我的密码:
public class RatioCount {
public static class WritableArray extends ArrayWritable {
public WritableArray(Class<? extends Writable> valueClass, Writable[] values) {
super(valueClass, values);
}
public WritableArray(Class<? extends Writable> valueClass) {
super(valueClass);
}
@Override
public DoubleWritable[] get() {
return (DoubleWritable[]) super.get();
}
@Override
public void write(DataOutput arg0) throws IOException {
System.out.println("write method called");
super.write(arg0);
}
@Override
public String toString() {
return Arrays.toString(get());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "ratio count");
job.setJarByClass(RatioCount.class);
job.setMapperClass(MyMapper.class);
job.setCombinerClass(MyReducer.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputValueClass(WritableArray.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class MyReducer
extends Reducer<Text, DoubleWritable, Text, WritableArray> {
private final IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
throws IOException, InterruptedException {
ArrayList<DoubleWritable> list = new ArrayList<DoubleWritable>();
for(DoubleWritable value :values){
list.add(value);
}
context.write(key, new WritableArray(DoubleWritable.class, list.toArray(new DoubleWritable[list.size()])));
}
}
public static class MyMapper extends Mapper<Object, Text, Text, DoubleWritable> {
private final Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
if (value.toString().contains("StartTime")) {
return;
}
DoubleWritable ratio;
StringTokenizer(value.toString(),",");
String[] tokens = value.toString().split(",");
StringBuilder sb = new StringBuilder();
sb.append(tokens[2]);
sb.append(tokens[3]);
sb.append(tokens[6]);
sb.append(tokens[7]);
System.out.println(sb.toString());
word.set(sb.toString());
double sappbytes = Double.parseDouble(tokens[13]);
double totbytes = Double.parseDouble(tokens[14]);
double dappbytes = totbytes - sappbytes;
ratio = new DoubleWritable((sappbytes - dappbytes) / totbytes);
context.write(word, ratio);
}
}
}
1条答案
按热度按时间krcsximq1#
你的问题是这行:
job.setCombinerClass(MyReducer.class);
组合器必须接收和发射相同的类型。在您的情况下,您有:Reducer<Text, DoubleWritable, Text, WritableArray>
将输出一个WritableArray
但是下面的例子DoubleWritable
.您应该删除合并器,或者重新编写它(作为一个单独的类到您的reducer中),以便它接受
Text, DoubleWriteable
发出相同类型的光。