hadoopmapreduce无序元组作为Map键

a7qyws3x  于 2021-05-30  发布在  Hadoop
关注(0)|答案(3)|浏览(275)

基于hadoop的wordcount示例(权威指南),我开发了一个mapreduce作业来计算无序字符串元组的出现次数。输入看起来像这样(只是更大):
a和b
丙丙
d d d
b和a
a和d
d d d
运行mapreduce时,我希望输出为(对于本例):
C1级
第1天
甲乙二
a和d 1
第1天
也就是说,我希望元组a,b和b,a被认为是相同的。这里已经提出了一个问题:hadoopmapreduce:两个值作为mapper reducer中的键,可能在这里已经解决了https://developer.yahoo.com/hadoop/tutorial/module5.html#keytypes.
对于像这样输出的大型输入文件,第一列是resp的hashcode。密钥:
151757761甲62822
153322274甲乙62516
154886787甲丙62248
156451300AD62495
153322274乙62334
154902916乙62232
158064200巴西62759
154886787加拿大62200
156483558中乙124966
158080329丙62347
159677100迪卡125047
156451300D a 62653
158064200德国62603
161290000德国62778
可以看出,有些键是重复的,比如a、b和b、a的153322274。对于其他的,如c,b(和b,c)和c,d(和d,c),计数是正确的。因为测试数据是均匀随机抽取的,所以大约是其他数据的两倍。
我已经搜索这个问题一段时间了,现在已经没有什么想法了,为什么在reduce阶段之后仍然会有关键的重复。
下面是我使用的代码:
首先是我的自定义可写代码

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.math.BigInteger;

public class Pair implements WritableComparable<Pair> {

    private String first;
    private String second;

    public Pair(String first, String second) {
        this.first = first;
        this.second = second;
    }

    public Pair() {
       this("", "");
    }

    @Override
    public String toString() {
        return this.hashCode() + "\t" + first + "\t" + second;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        WritableUtils.writeString(out, first);
        WritableUtils.writeString(out, second);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        first = WritableUtils.readString(in);
        second = WritableUtils.readString(in);
    }

    @Override
    public int hashCode() {
        BigInteger bA = BigInteger.ZERO;
        BigInteger bB = BigInteger.ZERO;

        for(int i = 0; i < first.length(); i++) {
            bA = bA.add(BigInteger.valueOf(127L).pow(i+1).multiply(BigInteger.valueOf(first.codePointAt(i))));
        }

        for(int i = 0; i < second.length(); i++) {
            bB = bB.add(BigInteger.valueOf(127L).pow(i+1).multiply(BigInteger.valueOf(second.codePointAt(i))));
        }

        return bA.multiply(bB).intValue();
    }

    @Override
    public boolean equals(Object o) {
        if (o instanceof Pair) {
            Pair other = (Pair) o;

            boolean result = ( first.compareTo(other.first) == 0 && second.compareTo(other.second) == 0 )
                    || ( first.compareTo(other.second) == 0 && second.compareTo(other.first) == 0 );

            return result;
        }
        return false;
    }

    @Override
    public int compareTo(Pair other) {
        if (( first.compareTo(other.first) == 0 && second.compareTo(other.second) == 0 )
                || ( first.compareTo(other.second) == 0 && second.compareTo(other.first) == 0 ) ) {
            return 0;
        } else {
            int cmp = first.compareTo( other.first );

            if (cmp != 0) {
                return cmp;
            }

            return second.compareTo( other.second );
        }
    }
}

其余的:

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class PairCount {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        if (otherArgs.length < 2) {
            System.err.println("Usage: paircount <in-dir> <out-dir>");
            System.exit(2);
        }

        Job job = new Job(conf, "word count");
        job.setJarByClass(PairCount.class);

        job.setMapperClass(TokenizerMapper.class);
        job.setReducerClass(IntSumReducer.class);

        job.setMapOutputKeyClass(Pair.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Pair.class);
        job.setOutputValueClass(IntWritable.class);

        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class TokenizerMapper extends Mapper<Object, Text, Pair, IntWritable> {

        private final static IntWritable one = new IntWritable(1);

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());

            while (itr.hasMoreTokens()) {
                context.write(new Pair(itr.nextToken(), itr.nextToken()), one);
            }
        }
    }

    public static class IntSumReducer extends Reducer<Pair, IntWritable, Pair, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Pair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;

            for (IntWritable val : values) {
                sum += val.get();
            }

            result.set(sum);
            context.write( key, result);
        }
    }
}

编辑:我为hashcode()和compareto()函数添加了单元测试。它们工作得很好。

import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

public class Tests  {
    @Test
    public void testPairComparison() {
        assertTrue( 0 == new Pair("a", "a").compareTo(new Pair("a", "a")) );
        assertTrue( 0 == new Pair("a", "b").compareTo(new Pair("b", "a")) );
        assertTrue( 0 == new Pair("a", "c").compareTo(new Pair("c", "a")) );
        assertTrue( 0 == new Pair("a", "d").compareTo(new Pair("d", "a")) );

        assertTrue( 0 == new Pair("b", "b").compareTo(new Pair("b", "b")) );
        assertTrue( 0 == new Pair("b", "c").compareTo(new Pair("c", "b")) );
        assertTrue( 0 == new Pair("b", "d").compareTo(new Pair("d", "b")) );

        assertTrue( 0 == new Pair("c", "c").compareTo(new Pair("c", "c")) );
        assertTrue( 0 == new Pair("c", "d").compareTo(new Pair("d", "c")) );

        assertTrue( 0 == new Pair("d", "d").compareTo(new Pair("d", "d")) );

        assertTrue( 0 > new Pair("a", "a").compareTo(new Pair("b", "b")) );
        assertTrue( 0 > new Pair("a", "a").compareTo(new Pair("c", "b")) );
        assertTrue( 0 < new Pair("d", "d").compareTo(new Pair("c", "b")) );
        assertTrue( 0 < new Pair("c", "d").compareTo(new Pair("c", "a")) );
    }

    @Test
    public void testPairHashcode(){
        assertTrue( 0 != new Pair("a", "a").hashCode());
        assertTrue( 0 != new Pair("a", "b").hashCode());
        assertTrue( 0 != new Pair("a", "c").hashCode());
        assertTrue( 0 != new Pair("a", "d").hashCode());

        assertTrue( 0 != new Pair("b", "b").hashCode());
        assertTrue( 0 != new Pair("b", "c").hashCode());
        assertTrue( 0 != new Pair("b", "d").hashCode());

        assertTrue( 0 != new Pair("c", "c").hashCode());
        assertTrue( 0 != new Pair("c", "d").hashCode());

        assertTrue( 0 != new Pair("d", "d").hashCode());

        assertEquals( new Pair("a", "a").hashCode(), new Pair("a", "a").hashCode() );
        assertEquals( new Pair("a", "b").hashCode(), new Pair("b", "a").hashCode() );
        assertEquals( new Pair("a", "c").hashCode(), new Pair("c", "a").hashCode() );
        assertEquals( new Pair("a", "d").hashCode(), new Pair("d", "a").hashCode() );

        assertEquals( new Pair("b", "b").hashCode(), new Pair("b", "b").hashCode() );
        assertEquals( new Pair("b", "c").hashCode(), new Pair("c", "b").hashCode() );
        assertEquals( new Pair("b", "d").hashCode(), new Pair("d", "b").hashCode() );

        assertEquals( new Pair("c", "c").hashCode(), new Pair("c", "c").hashCode() );
        assertEquals( new Pair("c", "d").hashCode(), new Pair("d", "c").hashCode() );

        assertEquals( new Pair("d", "d").hashCode(), new Pair("d", "d").hashCode() );

        assertNotEquals( new Pair("a", "a").hashCode(), new Pair("b", "b").hashCode() );
        assertNotEquals( new Pair("a", "b").hashCode(), new Pair("b", "d").hashCode() );
        assertNotEquals( new Pair("a", "c").hashCode(), new Pair("d", "a").hashCode() );
        assertNotEquals( new Pair("a", "d").hashCode(), new Pair("a", "a").hashCode() );
    }
}

但我意识到,将compareto()更改为始终返回0将导致每个对被认为是相同的,从而导致输出:
156483558加元1000000
同时将hashcode()更改为始终返回0(对于与上面相同的输入数据)将导致与上面相同的结果,只是键为零。
0安62822
0安b 62516
0安c 62248
公元62495年
0乙62334
0乙62232
公元62759年
0摄氏度62200
邮编:124966
0摄氏度62347
0天c 125047
0天a 62653
0天b 62603
0天62778
编辑:
我做了进一步的调查,使compareto()打印出被比较的内容。这表明,像a,b和b,a这样的一些键从来没有相互比较过,因此没有分组。

如果不是所有的键都互相比较,那么分组怎么可能呢(除了使用hashcode()它没有的功能之外)?

我想我少了一些小东西。我很高兴有任何想法!事先非常感谢。
致以最诚挚的问候

knpiaxh1

knpiaxh11#

问题出在compareto()函数中。首先检查它们在a方面是否相等,b是否等于b,a。如果不是这样,首先比较成对的较小值,如果它们匹配,则比较resp的较大值。对。这就解决了问题。
我现在就是这样实现的:

@Override
public int compareTo(Pair other){
    int cmpFirstFirst = first.compareTo(other.first);
    int cmpSecondSecond =  second.compareTo(other.second);
    int cmpFirstSecond = first.compareTo(other.second);
    int cmpSecondFirst =  second.compareTo(other.first);

    if ( cmpFirstFirst == 0 && cmpSecondSecond == 0 || cmpFirstSecond == 0 && cmpSecondFirst == 0) {
        return 0;
    }

    String thisSmaller;
    String otherSmaller;

    String thisBigger;
    String otherBigger;

    if ( this.first.compareTo(this.second) < 0 ) {
        thisSmaller = this.first;
        thisBigger = this.second;
    } else {
        thisSmaller = this.second;
        thisBigger = this.first;
    }

    if ( other.first.compareTo(other.second) < 0 ) {
        otherSmaller = other.first;
        otherBigger = other.second;
    } else {
        otherSmaller = other.second;
        otherBigger = other.first;
    }

    int cmpThisSmallerOtherSmaller = thisSmaller.compareTo(otherSmaller);
    int cmpThisBiggerOtherBigger = thisBigger.compareTo(otherBigger);

    if (cmpThisSmallerOtherSmaller == 0) {
        return cmpThisBiggerOtherBigger;
    } else {
        return cmpThisSmallerOtherSmaller;
    }
}

这意味着,与我的假设相反,Map输出的分组是使用传递关系而不是键的叉积来完成的。稳定的钥匙顺序是必要的。一旦你知道并理解了它,这是完全有意义的。

bnlyeluc

bnlyeluc2#

考虑到{a,b}=:={b,a}的初始要求,让元组元素在构造函数中排序不是更容易吗?

public Pair(String first, String second) {
    boolean swap = first.compareTo(second) > 0;
    this.first = swap ? second : first;
    this.second = swap ? first : second;
}

这将简化像compareto和equals这样的方法,并且不需要实现分区器。

ykejflvf

ykejflvf3#

我想我看到了问题所在。您尚未实现分区器。
当你说你面对大数据集的问题时,我假设你正在使用多个缩减器。如果您使用的是单个减速机,那么您的代码就可以工作了。但是如果有多个reducer,你需要一个分区器来告诉frameowrk ab和ba本质上是相同的键,应该属于同一个reducer。
下面是解释性链接:link

相关问题