使用apacheflink的java数据聚合

lqfhib0f  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(431)

我正在尝试创建一个使用flink数据流api的实时数字计数器。但是我要解决一些问题。
示例:数据有效负载

{
    "room": 1,  # Room Number
    "numbers": [101, 111, 201, 211, 13, ....], # Only these numbers in output with count
    "my_number": 401  # My Current Number according to room
}

只有4个房间1、2、3和4,我的房间号会因房间而异。这是我传递给flink的流数据。
问题陈述:我想根据房间来计算数字,而在输出中只想返回带有计数的数组数字。每个房间都一样。

output example:
 [
    {
        101: 2,
        111: 5,
        201: 1
        .
        .
        .
    }
 ]
uyhoqukh

uyhoqukh1#

如果我理解正确,你可以这样做:

dataPayloadSource.keyby("room").process(new CountNumbers()).flatMap(new MapDataPayloadToCorrespondObject())addSink(...);

// ...
public class CountNumbers extends KeyedProcessFunction<..>{
    private MapState<Integer, Integer> numberCountState;

    public void open(Configuration config){
        // initialize state in here
    }

    public void processElement(DataPayload dp){
        // for each numbers in the dp.counts, get the state value with numberCountState.get(..)
        // check it returns null, if yes, map does not have that key, initialize with 1
        // if not null, then get the current value from the map, increment by 1
        // update the mapstate
    } 
}

// ...
public class MapDataPayloadToCorrespondObject extends RichFlatMapFunction<...>{
    public void flatMap(...){
        // convert DataPayload to OutputObject
    }
}

相关问题