第二次迭代-值保持不变

5tmbdcev  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(364)

我正在使用hadoop开发一个mapreduce程序。
我的减速机中有这部分代码:

public void reduce(Text key, Iterable<TextLongWritable> values,Context context) throws IOException, InterruptedException {

    long word1count = 0;
    List<TextLongWritable> cache = new ArrayList<TextLongWritable>();

    String decade = key.toString().split("\t")[0];
    String word1 = key.toString().split("\t")[1];

    for (TextLongWritable val : values) {
        if (val.getWord().equals("*")){
            word1count += val.getCount();
            continue;
        }
        cache.add(val);
        log.info("***Reducer***Word1: " + word1 + "  Word2: " + val.getWord());
    }

    context.write(key, new Text("" + word1count));

    for (TextLongWritable value : cache) {
        if (value.getWord().equals("*")){
            continue;
        }
        log.info("***Reducer***Word1: " + word1 + "  Word2: " + value.getWord());
        context.write(new Text(decade + "\t" + value.getWord()), new Text(word1 + " " + value.getCount() + "\t" + word1count));
    }

}

首先,正如我在这里看到的那样,我使用缓存来迭代两次值。
我的问题是,在第二个循环中,所有值都保持不变。例如,如果我有一个单词列表 one two three . 假设关键是 1900 test ,所以 word1 = "test" .
第一个记录器输出为:


***Reducer***Word1: test  Word2: one
***Reducer***Word1: test  Word2: two
***Reducer***Word1: test  Word2: three

但第二个记录器输出将是:


***Reducer***Word1: test  Word2: one
***Reducer***Word1: test  Word2: one
***Reducer***Word1: test  Word2: one

由于某些原因,该值保持不变。
我做错什么了?这和hadoop有关吗?

kmb7vmvb

kmb7vmvb1#

由于gc开销,hadoop在反序列化期间缓存相同的对象。你必须克隆或深度复制你的 TextLongWritable 为了把它收集起来。

4ngedf3f

4ngedf3f2#

我通过参考这一页设法解决了这个问题。实际上,我首先回顾了所有这些案例,这个案例是那一页中第二个错误的例子。
关于在mapreduce post中管理迭代器的一些说明。
所以我要做的就是在把我的价值添加到 cache .
我的工作代码如下:

public void reduce(Text key, Iterable<TextLongWritable> values,Context context) throws IOException, InterruptedException {

    long word1count = 0;
    List<TextLongWritable> cache = new ArrayList<TextLongWritable>();

    String decade = key.toString().split("\t")[0];
    String word1 = key.toString().split("\t")[1];

    for (TextLongWritable val : values) {
        if (val.getWord().equals("*")){
            word1count += val.getCount();
            continue;
        }
        TextLongWritable val_copy = new TextLongWritable(val.getWord(),val.getCount());
        cache.add(val_copy);
    }

    context.write(key, new Text("" + word1count));

    for (TextLongWritable value : cache) {
        context.write(new Text(decade + "\t" + value.getWord()), new Text(word1 + " " + value.getCount() + "\t" + word1count));
    }
}

相关问题