在使用翻滚窗口的apache flink应用程序中遇到问题。窗口大小是10秒,我希望每10秒有一个resultset数据流。但是,当最新窗口的结果集总是延迟时,除非我将进一步的数据推送到源流。
例如,如果我在“01:33:40.0”和“01:34:00.0”之间将多个记录推送到源流,然后停止查看日志,则不会发生任何事情。
我在“01:37:xx”上再次推送一些数据,然后将获得“01:33:40.0”和“01:34:00.0”之间窗口的结果集,这是不期望的,因为下游接收器逻辑期望结果集准时出现。
任何提示,以改善这将是非常感谢。谢谢。
日志如下:
"log timestamp": "2019-11-15 01:37:45",
"message": "resultSet output: CLASS: 13 CNT: 1 from: 2019-11-15 01:33:40.0 to: 2019-11-15 01:34:00.0\n",
下面是代码段:
Table resultTable = tableEnv.sqlQuery(""+
"SELECT " +
" CAST (N02_001 AS VARCHAR(10)) AS RAILWAY_CLASS, " +
" COUNT(*) RAILWAY_CLASS_COUNT, " +
" TUMBLE_START(rowtime, INTERVAL '20' SECOND) as WINDOW_START, " +
" TUMBLE_END(rowtime, INTERVAL '20' SECOND) as WINDOW_END " +
" FROM Inputs " +
" GROUP BY TUMBLE(rowtime, INTERVAL '20' SECOND), CAST (N02_001 AS VARCHAR(10))");
TupleTypeInfo<Tuple4<String, Long, Timestamp, Timestamp>> tupleType = new TupleTypeInfo<>(
Types.STRING,
Types.LONG,
Types.SQL_TIMESTAMP,
Types.SQL_TIMESTAMP);
DataStream<Tuple4<String, Long, Timestamp, Timestamp>> resultSet = tableEnv.toAppendStream(resultTable, tupleType);
resultSet
.map((Tuple4<String, Long, Timestamp, Timestamp> value) -> {
String output = "CLASS: " + value.f0 + " CNT: " + value.f1 + " from: " + value.f2 + " to: " + value.f3 + "\n";
log.warn("resultSet output: " + output);
return value;
})
.returns(Types.TUPLE(Types.STRING, Types.LONG, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP));
1条答案
按热度按时间olhwl3o21#
这是您正在使用的预期行为
EventTime
,这意味着用于关闭窗口和跟踪应用程序中的时间流的水印来自事件时间戳。这意味着如果没有事件,就不会有时间流,因此现在将生成窗口。这就是你所观察到的。你正在经历的行为很可能来自你正在使用
AssignerWithPunctuatedWatermark
,它为每个事件发出时间戳和水印。如果你换成AssignerWithPeriodicWatermark
即使不存在数据,也应该生成水印,并关闭并发出窗口。