风暴三叉戟平均聚合器

h79rfbju  于 2021-06-24  发布在  Storm
关注(0)|答案(2)|浏览(375)

我是trident的新手,希望创建一个类似于“sum()”的“average”聚合器,但用于“average”。以下操作不起作用:

public class Average implements CombinerAggregator<Long>.......{

       public Long init(TridentTuple tuple)
       {
       (Long)tuple.getValue(0);
        }
        public Long Combine(long val1,long val2){
        return val1+val2/2;
        }
        public Long zero(){
        return 0L;
         }
       }

它可能在语法上并不完全正确,但这就是它的想法。如果可以请帮忙。给定值为[2,4,1]和[2,2,5]的两个元组以及字段“a”、“b”和“c”,对字段“b”进行平均应返回“3”。我不太清楚init()和zero()是如何工作的。
非常感谢你的帮助。
伊莱

4dbbbstv

4dbbbstv1#

public class Average implements CombinerAggregator<Number> {

int count = 0;
double sum = 0;

@Override
public Double init(final TridentTuple tuple) {
    this.count++;
    if (!(tuple.getValue(0) instanceof Double)) {

        double d = ((Number) tuple.getValue(0)).doubleValue();

        this.sum += d;

        return d;
    }

    this.sum += (Double) tuple.getValue(0);
    return (Double) tuple.getValue(0);

}

@Override
public Double combine(final Number val1, final Number val2) {
    return this.sum / this.count;

}

@Override
public Double zero() {
    this.sum = 0;
    this.count = 0;
    return 0D;
}
}
4xy9mtcn

4xy9mtcn2#

我是一个完全的新手,当谈到三叉戟以及,所以我不完全是如果以下将工作。但它可能:

public class AvgAgg extends BaseAggregator<AvgState> {
    static class AvgState {
        long count = 0;
        long total = 0;

        double getAverage() {
            return total/count;
        }
    }

    public AvgState init(Object batchId, TridentCollector collector) {
        return new AvgState();
    }

    public void aggregate(AvgState state, TridentTuple tuple, TridentCollector collector) {
        state.count++;
        state.total++;
    }

    public void complete(AvgState state, TridentCollector collector) {
        collector.emit(new Values(state.getAverage()));
    }
}

相关问题