Flink临时连接只工作几秒钟

cnjp1d6j  于 2023-04-27  发布在  Apache
关注(0)|答案(1)|浏览(109)

我尝试在Flink中实现一个事件时间临时连接。下面是第一个连接表:

tEnv.executeSql("CREATE TABLE AggregatedTrafficData_Kafka (" +
            "`timestamp` TIMESTAMP_LTZ(3)," +
            "`area` STRING," +
            "`networkEdge` STRING," +
            "`vehiclesNumber` BIGINT," +
            "`averageSpeed` INTEGER," +
            "WATERMARK FOR `timestamp` AS `timestamp`" +
            ") WITH (" +
            "'connector' = 'kafka'," +
            "'topic' = 'seneca.trafficdata.aggregated'," +
            "'properties.bootstrap.servers' = 'localhost:9092'," +
            "'properties.group.id' = 'traffic-data-aggregation-job'," +
            "'format' = 'json'," +
            "'json.timestamp-format.standard' = 'ISO-8601'" +
            ")");

该表用作以下查询的接收器:

Table aggregatedTrafficData = trafficData
            .window(Slide.over(lit(30).seconds())
                    .every(lit(15).seconds())
                    .on($("timestamp"))
                    .as("w"))
            .groupBy($("w"), $("networkEdge"), $("area"))
            .select(
                    $("w").end().as("timestamp"),
                    $("area"),
                    $("networkEdge"),
                    $("plate").count().as("vehiclesNumber"),
                    $("speed").avg().as("averageSpeed")
            );

下面是另一个连接表。我使用Debezium将Postgres表流到Kafka:

tEnv.executeSql("CREATE TABLE TransportNetworkEdge_Kafka (" +
            "`timestamp` TIMESTAMP_LTZ(3) METADATA FROM 'value.source.timestamp' VIRTUAL," +
            "`urn` STRING," +
            "`flow_rate` INTEGER," +
            "PRIMARY KEY(`urn`) NOT ENFORCED," +
            "WATERMARK FOR `timestamp` AS `timestamp`" +
            ") WITH (" +
            "'connector' = 'kafka'," +
            "'topic' = 'seneca.network.transport_network_edge'," +
            "'scan.startup.mode' = 'latest-offset'," +
            "'properties.bootstrap.servers' = 'localhost:9092'," +
            "'properties.group.id' = 'traffic-data-aggregation-job'," +
            "'format' = 'debezium-json'," +
            "'debezium-json.schema-include' = 'true'" +
            ")");

最后是temporal join:

Table transportNetworkCongestion = tEnv.sqlQuery("SELECT AggregatedTrafficData_Kafka.`timestamp`, `networkEdge`, " +
            "congestion(`vehiclesNumber`, `flow_rate`) AS `congestion` FROM AggregatedTrafficData_Kafka " +
            "JOIN TransportNetworkEdge_Kafka FOR SYSTEM_TIME AS OF AggregatedTrafficData_Kafka.`timestamp` " +
            "ON AggregatedTrafficData_Kafka.`networkEdge` = TransportNetworkEdge_Kafka.`urn`");

我遇到的问题是,连接只在前几秒起作用(在Postgres表中更新后),但我需要继续连接第一个表与debezium 1。我做错了什么吗?谢谢euks

xxhby3vn

xxhby3vn1#

使用AS OF语法的临时连接需要:

  • 具有有效事件时间属性的仅追加表
  • 具有主键和有效事件时间属性的更新表
  • 主键上的相等 predicate

当Flink SQL的时态运算符应用于事件时间流时,水印在确定何时产生结果以及何时清除状态方面起着关键作用。
执行临时连接时:

  • 仅追加表中的行以Flink状态缓冲,直到连接运算符的当前水印达到其时间戳
  • 对于版本化表,对于每个键,其时间戳在联接运算符的当前水印之前的最新版本以及来自当前水印之后的任何版本都保持状态
  • 每当联接运算符的水印前进时,都会产生新的结果,并且不再相关的状态将被清除

join运算符跟踪它从输入通道接收的水印,并且它的当前水印总是这两个水印中的最小值。这就是为什么你的join会停止,并且只有在flow_rate更新时才有进展。
解决这个问题的一种方法是为TransportNetworkEdge_Kafka表设置水印,如下所示:

WATERMARK FOR timestamp as CAST('9999-12-31 00:00:00.000' as TIMESTAMP(3))

这将把这个表/流的水印设置为一个非常大的值,这将具有使来自这个流的水印不相关的效果--这个流的水印永远不会是最小的。
然而,这将具有使联接结果不确定的缺点。

相关问题