我尝试在Flink中实现一个事件时间临时连接。下面是第一个连接表:
tEnv.executeSql("CREATE TABLE AggregatedTrafficData_Kafka (" +
"`timestamp` TIMESTAMP_LTZ(3)," +
"`area` STRING," +
"`networkEdge` STRING," +
"`vehiclesNumber` BIGINT," +
"`averageSpeed` INTEGER," +
"WATERMARK FOR `timestamp` AS `timestamp`" +
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'seneca.trafficdata.aggregated'," +
"'properties.bootstrap.servers' = 'localhost:9092'," +
"'properties.group.id' = 'traffic-data-aggregation-job'," +
"'format' = 'json'," +
"'json.timestamp-format.standard' = 'ISO-8601'" +
")");
该表用作以下查询的接收器:
Table aggregatedTrafficData = trafficData
.window(Slide.over(lit(30).seconds())
.every(lit(15).seconds())
.on($("timestamp"))
.as("w"))
.groupBy($("w"), $("networkEdge"), $("area"))
.select(
$("w").end().as("timestamp"),
$("area"),
$("networkEdge"),
$("plate").count().as("vehiclesNumber"),
$("speed").avg().as("averageSpeed")
);
下面是另一个连接表。我使用Debezium将Postgres表流到Kafka:
tEnv.executeSql("CREATE TABLE TransportNetworkEdge_Kafka (" +
"`timestamp` TIMESTAMP_LTZ(3) METADATA FROM 'value.source.timestamp' VIRTUAL," +
"`urn` STRING," +
"`flow_rate` INTEGER," +
"PRIMARY KEY(`urn`) NOT ENFORCED," +
"WATERMARK FOR `timestamp` AS `timestamp`" +
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'seneca.network.transport_network_edge'," +
"'scan.startup.mode' = 'latest-offset'," +
"'properties.bootstrap.servers' = 'localhost:9092'," +
"'properties.group.id' = 'traffic-data-aggregation-job'," +
"'format' = 'debezium-json'," +
"'debezium-json.schema-include' = 'true'" +
")");
最后是temporal join:
Table transportNetworkCongestion = tEnv.sqlQuery("SELECT AggregatedTrafficData_Kafka.`timestamp`, `networkEdge`, " +
"congestion(`vehiclesNumber`, `flow_rate`) AS `congestion` FROM AggregatedTrafficData_Kafka " +
"JOIN TransportNetworkEdge_Kafka FOR SYSTEM_TIME AS OF AggregatedTrafficData_Kafka.`timestamp` " +
"ON AggregatedTrafficData_Kafka.`networkEdge` = TransportNetworkEdge_Kafka.`urn`");
我遇到的问题是,连接只在前几秒起作用(在Postgres表中更新后),但我需要继续连接第一个表与debezium 1。我做错了什么吗?谢谢euks
1条答案
按热度按时间xxhby3vn1#
使用AS OF语法的临时连接需要:
当Flink SQL的时态运算符应用于事件时间流时,水印在确定何时产生结果以及何时清除状态方面起着关键作用。
执行临时连接时:
join运算符跟踪它从输入通道接收的水印,并且它的当前水印总是这两个水印中的最小值。这就是为什么你的join会停止,并且只有在flow_rate更新时才有进展。
解决这个问题的一种方法是为TransportNetworkEdge_Kafka表设置水印,如下所示:
这将把这个表/流的水印设置为一个非常大的值,这将具有使来自这个流的水印不相关的效果--这个流的水印永远不会是最小的。
然而,这将具有使联接结果不确定的缺点。