我有一个数据流,通常是无序的。我将数据集定义为:
DataStream<ApplicationMetric> metrics = env
.addSource(new FlinkKinesisConsumer<>("applicationMetric", new SimpleStringSchema(), consumerConfig))
.map(mapper)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<>(Time.seconds(30)));
并打印为:
Table table = bsTableEnv.fromDataStream(dataset, "createdAt, name, duration, rowtime.rowtime");
Table t1 = bsTableEnv.sqlQuery("SELECT CAST((createdAt/1000) as TIMESTAMP) as ts, rowtime, name, duration " + table);
bsTableEnv.toAppendStream(t1, Row.class).print();
那么,如果我尝试在带有时间窗口(翻滚等)的sql查询中使用它,这是如何工作的呢?我想我对这些概念的理解可能是根本错误的。但是我假设延迟事件将被删除,并且我可以使用我的rowtime作为滚动窗口属性?
1条答案
按热度按时间e7arh2l61#
您可以控制在datastream api中处理延迟事件的方式。默认情况下,所有大于30秒的事件都会被删除到代码中。这30秒会增加端到端的延迟,所以输出会被延迟,直到flink看到了30秒的数据并考虑了所有的延迟事件。您还可以将延迟事件存储在单独的输出或重新触发计算中。
不幸的是,表api/sql不支持这种配置。延迟事件总是会被删除,但对于您的用例来说,这似乎已经足够了。