我正在尝试创建一个使用flink数据流api的实时数字计数器。但是我要解决一些问题。
假设user-1发送数据负载
{
"room": 1, # Room Number
"numbers": [101, 111, 201, 211, ....], # Only these numbers in output with count
"my_number": 401 # My Current Number according to room
}
用户2发送数据有效负载
{
"room": 1, # Room Number
"numbers": [201, 211, 301, 311, ....], # Only these numbers in output with count
"my_number": 101 # My Current Number according to room
}
然后user-1将接收数字[101、111、201、211….],每个数字的总计数
并且用户2将接收带有每个数字总计数的数字[201、211、201、311,…]
示例输出
{
"room": 1, # Room Number
"numbers": [(201, 2), (211, 3), (301, 1), (311, 5)]
}
需要向每个用户返回我的\numbers键,并需要返回用户将在数据流中发送的数组编号。
我做了计数部分,但我停留在过滤,我需要过滤每个用户的号码。
class NumberCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<StreamData> streamSource = env.addSource(...);
DataStream<OutputStreamData> outputDataStream =
streamSource
.flatMap( new CountNumber())
.keyBy("my_number")
.timeWindow(Time.seconds(5))
.sum("value");
outputDataStream.print();
env.execute("Number Count");
}
}
public class CountNumber implements FlatMapFunction<StreamData, OutputStreamData> {
@Override
public void flatMap(StreamData streamData, Collector<OutputStreamData> collector) throws Exception {
collector.collect(new OutputStreamData(streamData.getRoom(), streamData.getMyNumber(), 1));
}
}
我不知道如何达到预期的结果。有人能帮我得到正确的结果吗?
暂无答案!
目前还没有任何答案,快来回答吧!