ksql-确定何时加载表

5q4ezhmt  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(400)

如何确定ksql何时已将kafka主题中的数据完全加载到表中?
目标:选择2个Kafka主题,加入其中,并将结果写入一个新的Kafka主题。
例子:
我使用ksql的restapi发出以下命令。

  1. CREATE TABLE MyTable (A1 VARCHAR, A2 VARCHAR) WITH (kafka_topic='topicA', key='A1', value_format='json');
  2. CREATE STREAM MyStream (B1 varchar, B2 varchar) WITH (kafka_topic='topicB', value_format='json');
  3. CREATE STREAM MyDestination WITH (Kafka_topic='topicC', PARTITIONS = 1, value_format='json') AS SELECT a.A1 as A1, a.A2 as A2, b.B1 as B1, b.B2 as B2 FROM MyStream b left join MyTable a on a.A1 = b.B1;

问题:topicc只有来自topicb的数据,并且所有联接的值都为null。
尽管我从createtable命令收到了一个成功状态,但数据似乎还没有完全加载到表中。因此,第3个命令的结果只包含流中的数据,不包含表中的数据。如果在执行join命令之前人为地延迟,那么生成的主题将正确地包含来自这两个主题的数据。如何确定何时加载表,以及执行join命令是否安全?

cuxqih21

cuxqih211#

ksql中的表(以及底层的kafka流)有一个时间维度,即随时间演化的表。对于流表联接,每个流记录都与“正确”的表版本联接(即,表按时间进行版本控制)。
在即将发布的CP5.1版本中,您可以通过确保表主题的所有记录时间戳小于流主题的记录时间戳来“预加载”表。这告诉ksql,它需要首先处理表主题数据,但是在开始加入之前,相应地提前表时间戳版本。
有关详细信息,请查看:https://www.confluent.io/resources/streams-tables-two-sides-same-coin

bprjcwpo

bprjcwpo2#

这的确是个大问题。在这一点上,ksql无法仅在表完全加载后自动执行流表联接。这确实是一个有用的特性。这里讨论的是一个更普遍和相关的问题:https://github.com/confluentinc/ksql/issues/1751

相关问题