如何在mapreduce中使用多个字段?

eqoofvh9  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(285)

我想了解如何使用mapreduce模型聚合多个字段。
例如,如果我得到这样一个数据文件:

id, site, name, qty, price
00, testA, NameA,1,1
01, testB,NameA,2,3
02, testB,NameB,5,7

并希望在mapreduce上实现此聚合:

select site,name, (qty*price) as total
from PO where name ='NameA' 
group by site,name,total 
order by site;

我该怎么做呢。
我可以按站点(键)、总计(值)进行聚合,但不确定如何包含name列。
我需要了解如何在mapreduce中处理多个字段。有没有一个我可以看的例子?还是需要使用hbase?

ldfqzlk8

ldfqzlk81#

你可以实施 WritableComparable 创造你自己的 CompositeKey 有几个字段,例如:

public static class CompositeKey implements WritableComparable<CompositeKey> {
    public final Text site;
    public final Text name;
    public final LongWritable total;

    public CompositeKey(Text site, Text name, LongWritable total) {
        this.site = site;
        this.name = name;
        this.total = total;
    }

    @Override
    public int compareTo(CompositeKey o) {
        int siteCmp = site.compareTo(o.site);
        if (siteCmp != 0) {
            return siteCmp;
        }
        int nameCmp = name.compareTo(o.name);
        if (nameCmp != 0) {
            return nameCmp;
        }
        return total.compareTo(o.total);
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        CompositeKey that = (CompositeKey) o;

        if (name != null ? !name.equals(that.name) : that.name != null) return false;
        if (site != null ? !site.equals(that.site) : that.site != null) return false;
        if (total != null ? !total.equals(that.total) : that.total != null) return false;

        return true;
    }

    @Override
    public int hashCode() {
        int result = site != null ? site.hashCode() : 0;
        result = 31 * result + (name != null ? name.hashCode() : 0);
        result = 31 * result + (total != null ? total.hashCode() : 0);
        return result;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        site.write(dataOutput);
        name.write(dataOutput);
        total.write(dataOutput);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        site.readFields(dataInput);
        name.readFields(dataInput);
        total.readFields(dataInput);
    }
}

相关问题