我正在用mapreduce框架用java制作一个hadoop应用程序。
我只对输入和输出使用文本键和值。我使用一个组合器来做一个额外的计算步骤,然后再减少到最终的输出。
但我有一个问题,钥匙不去同一个减速机。我在combiner中创建并添加如下键/值对:
public static class Step4Combiner extends Reducer<Text,Text,Text,Text> {
private static Text key0 = new Text();
private static Text key1 = new Text();
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
key0.set("KeyOne");
key1.set("KeyTwo");
context.write(key0, new Text("some value"));
context.write(key1, new Text("some other value"));
}
}
public static class Step4Reducer extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
System.out.print("Key:" + key.toString() + " Value: ");
String theOutput = "";
for (Text val : values) {
System.out.print("," + val);
}
System.out.print("\n");
context.write(key, new Text(theOutput));
}
}
总的来说,我创造的工作是这样的:
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job4 = new Job(conf, "Step 4");
job4.setJarByClass(Step4.class);
job4.setMapperClass(Step4.Step4Mapper.class);
job4.setCombinerClass(Step4.Step4Combiner.class);
job4.setReducerClass(Step4.Step4Reducer.class);
job4.setInputFormatClass(TextInputFormat.class);
job4.setOutputKeyClass(Text.class);
job4.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job4, new Path(outputPath));
FileOutputFormat.setOutputPath(job4, new Path(finalOutputPath));
System.exit(job4.waitForCompletion(true) ? 0 : 1);
减速机打印的标准输出如下:
Key:KeyOne Value: ,some value
Key:KeyTwo Value: ,some other value
Key:KeyOne Value: ,some value
Key:KeyTwo Value: ,some other value
Key:KeyOne Value: ,some value
Key:KeyTwo Value: ,some other value
这是没有意义的,因为键是相同的,因此它应该是2个异径管,其中3个值是相同的
希望你能帮我弄清真相:)
2条答案
按热度按时间gywdnpxw1#
在组合器中尝试以下操作:
我看到这种事情发生的唯一方法就是
key0
从一个合路器得出的结果不等于key0
从另一个。我不确定如果键指向完全相同的示例(如果使键为静态的话会发生这种情况),它会如何工作。zbsbpyhn2#
这很可能是因为您的组合器同时运行在map和reduce阶段(一个鲜为人知的“特性”)。
基本上,您正在修改合并器中的键,它可能会运行,也可能不会运行,因为map输出在reducer中合并在一起。在组合器运行之后(reduce侧),键通过分组比较器来确定返回给reduce方法的iterable的值(我在这里绕过reduce阶段的流方面-iterable不是由一组或一组值支持的,对iterator()的更多调用。如果分组比较器确定当前键和最后一个键相同,则next()返回true)
您可以尝试通过检查上下文(有一个
Context.getTaskAttempt().isMap()
方法,但我有一些记忆,这也是有问题的,甚至可能有一张关于这个的jira罚单)。底线是,不要修改合路器中的键,除非你能找到绕过这个漏洞的方法,如果合路器运行在reduce侧。
编辑以便调查@amar的评论,我整理了一些代码(pastebin链接),其中添加了一些详细的比较器、组合器、还原器等。如果您运行一个map作业,那么在reduce阶段将不会运行任何组合器,并且不会再次对map输出进行排序,因为它已经被假定为已排序。
它被假定为在被发送到combiner类之前被排序,并且它假定密钥将原封不动地出来-因此仍然被排序。记住,组合器是用来组合给定键的值的。
因此,对于一个单独的Map和给定的组合器,reducer可以看到keyone,keytwo,keyone,keytwo,keyone顺序的键。分组比较器看到它们之间的转换,因此您得到6个reduce函数调用
如果您使用两个Map器,那么reducer知道它有两个已排序的段(每个Map一个),因此在缩减之前仍然需要对它们进行排序-但是由于段的数量低于阈值,因此排序是作为内联流排序来完成的(同样假定这些段已排序)。使用两个Map器(reduce阶段输出的10条记录)仍然是错误的输出。
同样,不要修改组合器中的键,这不是组合器的用途。