使用flinkdatastreamapi的java实时数据聚合和数据过滤

6ojccjat  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(227)

我正在尝试创建一个使用flink数据流api的实时数字计数器。但是我要解决一些问题。
假设user-1发送数据负载

{
    "room": 1,  # Room Number
    "numbers": [101, 111, 201, 211, ....],  # Only these numbers in output with count
    "my_number": 401  # My Current Number according to room
}

用户2发送数据有效负载

{
    "room": 1,  # Room Number
    "numbers": [201, 211, 301, 311, ....],  # Only these numbers in output with count
    "my_number": 101  # My Current Number according to room
}

然后user-1将接收数字[101、111、201、211….],每个数字的总计数
并且用户2将接收带有每个数字总计数的数字[201、211、201、311,…]
示例输出

{
    "room": 1,  # Room Number
    "numbers": [(201, 2), (211, 3), (301, 1), (311, 5)]
}

需要向每个用户返回我的\numbers键,并需要返回用户将在数据流中发送的数组编号。
我做了计数部分,但我停留在过滤,我需要过滤每个用户的号码。

class NumberCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<StreamData> streamSource = env.addSource(...);

        DataStream<OutputStreamData> outputDataStream =
                                            streamSource
                                                .flatMap( new CountNumber())
                                                .keyBy("my_number")
                                                .timeWindow(Time.seconds(5))
                                                .sum("value");

        outputDataStream.print();
        env.execute("Number Count");
    }
}

public class CountNumber implements FlatMapFunction<StreamData, OutputStreamData> {

    @Override
    public void flatMap(StreamData streamData, Collector<OutputStreamData> collector) throws Exception {
        collector.collect(new OutputStreamData(streamData.getRoom(), streamData.getMyNumber(), 1));
    }
}

我不知道如何达到预期的结果。有人能帮我得到正确的结果吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题