如何在状态中聚合

gcuhipw9  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(348)

我有一个键控数据流,看起来像:

{
        summary:Integer
        uid:String
        key:String
        .....
    }

我需要在某个时间范围内聚合摘要值,一旦我获得了一个特定的数字,就会将摘要和所有影响摘要的uid刷新到数据库/日志文件中。
在第一次刷新之后,我想从内存中释放所有uid,并立即刷新每个新项。
所以我尝试了这个聚合函数。

public class AggFunc implements AggregateFunction<Item, Acc, Tuple2<Integer,List<String>>>{

    private static final long serialVersionUID = 1L;

    @Override
    public Acc createAccumulator() {
        return new Acc());
    }

    @Override
    public Acc add(Item value, Acc accumulator) {
        accumulator.inc(value.getSummary());
        accumulator.addUid(value.getUid);
        return accumulator;
    }

    @Override
    public Tuple2<Integer,List<String>> getResult(Acc accumulator) {
        List<String> newL = Lists.newArrayList(accumulator.getUids());
        accumulator.setUids(Lists.newArrayList());
        return Tuple2.of(accumulator.getSum(), newL);
    }

    @Override
    public Acc merge(Acc a, Acc b) {
        .....
    }

}

在aggregate process函数中,我将列表刷新为state,如果需要保存到数据库,我将清除state中的state和save标志以指示它。
但在我看来这是不公平的。我不确定这是否对我有效。
有没有更好的解决办法?

hl0ma9xz

hl0ma9xz1#

使用富函数中的状态。继续添加 uid 当窗口触发刷新值时。官方文件中的这一页有一个例子。
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html#using-键控状态
为了你的案子 ListState 会很管用的。
编辑:
上述解决方案适用于非窗口情况。对于窗口情况,只需使用agggation with apply函数,它可以具有丰富的窗口函数

相关问题