我有一个类似于Apache·Flink翻滚窗口延迟结果的问题。不同的是,我使用的sql使用kafka connect从主题中读取记录。我定期获取记录,但不知何故,我没有获取输出中的最后几条记录。例如,kafka主题中的最后一条记录的时间戳为2020-11-26t13:11:36.605z,聚合值的最后一条时间戳为2020-11-26t12:59:59.999。我不明白为什么我没有得到topic中最后一条记录的汇总。请帮忙。这是我的密码。
sourceSQL = "CREATE TABLE flink_read_kafka (clientId INT, orderId INT, contactTimeStamp, WATERMARK FOR contactTimeStamp AS contactTimeStamp - INTERVAL '5' SECOND with (kafka config) ";
sinkSQL = "CREATE TABLE flink_aggr_kafka (contactTimeStamp STRING, clientId INT, orderCount BIGINT) with (kafka config) ";
aggrSQL = "insert into flink_aggr_kafka SELECT TUMBLE_ROWTIME(contactTimeStamp, INTERVAL '5' MINUTE) as contactTimeStamp, clientId, COUNT(*) orderCount from flink_read_kafka GROUP BY clientId , TUMBLE(commsTimestamp, INTERVAL '5' MINUTE)";
blinkStreamTableEnv.executeSql(sourceSQL);
blinkStreamTableEnv.executeSql(sinkSQL);
blinkStreamTableEnv.executeSql(aggrSQL);
1条答案
按热度按时间ecbunoof1#
首先,一些背景:翻滚窗口只在水印超过窗口的最大时间戳时才发出结果。水印向框架指示时间戳较低的所有记录都已到达,因此窗口是完整的,可以发出结果。
水印只能根据传入的记录的时间戳前进,因此如果没有更多的记录传入,水印将不会前进,并且当前打开的窗口将不会关闭。因此,当不再有数据流入时,预计最后一个窗口将保持打开状态。
在您的示例中,通常假设也会发出行时间为2020-11-26t13:04:59.999和26t13:09:59.999的窗口,因为最新记录应该将水印推到这些时间戳之外。
我现在可以想到两个原因来解释为什么情况并非如此:
并非所有并行源示例都看到时间戳高于26t13:05:04.999,因此输出水印实际上没有传递该值。您可以通过运行并行度为1的作业来测试这一点,这将缓解问题,或者通过检查flinkwebui中窗口操作符的水印来验证是否存在这种情况。
如果您使用的是kafka producer在一次模式下,并且只使用已提交的记录,那么只有在触发窗口后完成检查点后,这些记录才可见。