我正在尝试使用ksql在一个时间限制内做任何我能做的处理,并在那个时间限制内得到结果。有关使用apachebeam说明的相同想法,请参见“处理时间计时器”下的使用apachebeam进行及时(和有状态)处理。
鉴于:
具有唯一密钥的事务流;
更新同一流中的这些事务;和
一个下游处理器,它希望在事务出现在第一个流中后的特定超时时间(比如20秒)接收更新的事务。
从概念上讲,我在考虑创建第一个流的ktable来保存事务的最新状态,并使用ksql通过查询ktable来创建一个输出流,其中的键(create\u time+timeout)<current\u time(并将超时作为“更新”添加到第一个流中,以便我可以从ktable中筛选出那些超时)
我还没有在ksql文档中找到这样做的方法,即使有一个内置的当前\u时间,我也不确定它是否会被评估,直到另一个记录出现。
如何在ksql中执行此操作?我需要自定义自定义项吗?如果不能在ksql中完成,我可以在kstreams中完成吗?
更新:现在看来kstreams不支持这个功能-apache flink似乎是这个用例(以及其他许多用例)的发展方向。如果你知道一个聪明的方法绕过kstreams的限制,告诉我!
1条答案
按热度按时间ha5z0ras1#
看一看这个
punctuate()
Kafka流处理器api中的功能,这可能是您正在寻找的。您可以将标点()用于流时间(默认值:事件时间)以及处理时间(通过PunctuationType.WALL_CLOCK_TIME
). 在这里,您将实现Processor
或者Transformer
,根据您的需要,将使用punctuate()
超时功能。看到了吗https://kafka.apache.org/documentation/streams/developer-guide/processor-api.html 更多信息。
提示:您也可以在kafka流的dsl中使用这样的处理器/转换器。这意味着您可以继续使用更方便的dsl,如果您愿意的话,只需要在基于dsl的代码的正确位置插入处理器/转换器。