我正在使用hadoop开发一个mapreduce程序。
我的减速机中有这部分代码:
public void reduce(Text key, Iterable<TextLongWritable> values,Context context) throws IOException, InterruptedException {
long word1count = 0;
List<TextLongWritable> cache = new ArrayList<TextLongWritable>();
String decade = key.toString().split("\t")[0];
String word1 = key.toString().split("\t")[1];
for (TextLongWritable val : values) {
if (val.getWord().equals("*")){
word1count += val.getCount();
continue;
}
cache.add(val);
log.info("***Reducer***Word1: " + word1 + " Word2: " + val.getWord());
}
context.write(key, new Text("" + word1count));
for (TextLongWritable value : cache) {
if (value.getWord().equals("*")){
continue;
}
log.info("***Reducer***Word1: " + word1 + " Word2: " + value.getWord());
context.write(new Text(decade + "\t" + value.getWord()), new Text(word1 + " " + value.getCount() + "\t" + word1count));
}
}
首先,正如我在这里看到的那样,我使用缓存来迭代两次值。
我的问题是,在第二个循环中,所有值都保持不变。例如,如果我有一个单词列表 one
two three
. 假设关键是 1900 test
,所以 word1 = "test"
.
第一个记录器输出为:
***Reducer***Word1: test Word2: one
***Reducer***Word1: test Word2: two
***Reducer***Word1: test Word2: three
但第二个记录器输出将是:
***Reducer***Word1: test Word2: one
***Reducer***Word1: test Word2: one
***Reducer***Word1: test Word2: one
由于某些原因,该值保持不变。
我做错什么了?这和hadoop有关吗?
2条答案
按热度按时间kmb7vmvb1#
由于gc开销,hadoop在反序列化期间缓存相同的对象。你必须克隆或深度复制你的
TextLongWritable
为了把它收集起来。4ngedf3f2#
我通过参考这一页设法解决了这个问题。实际上,我首先回顾了所有这些案例,这个案例是那一页中第二个错误的例子。
关于在mapreduce post中管理迭代器的一些说明。
所以我要做的就是在把我的价值添加到
cache
.我的工作代码如下: