我创造了一个 Ktable
主题使用 streamsBuilder.table("myTopic")
,我将其具体化为状态存储,以便使用交互式查询。
每小时,我都要从这个状态存储(以及关联的changelog主题)中删除其值在过去一小时内没有更新的记录。
我相信使用标点符号可以做到这一点,但是到目前为止我只使用了dsl,所以不确定如何继续。如果有人能给我举个例子,我将不胜感激。
谢谢,
杰克
我创造了一个 Ktable
主题使用 streamsBuilder.table("myTopic")
,我将其具体化为状态存储,以便使用交互式查询。
每小时,我都要从这个状态存储(以及关联的changelog主题)中删除其值在过去一小时内没有更新的记录。
我相信使用标点符号可以做到这一点,但是到目前为止我只使用了dsl,所以不确定如何继续。如果有人能给我举个例子,我将不胜感激。
谢谢,
杰克
1条答案
按热度按时间wnrlj8wa1#
可以将处理器api与dsl混合并匹配,但不能处理ktable。您需要转换为kstream。或者,您可以使用与状态存储交互的处理器创建一个新拓扑。
您需要将该状态存储在某个位置—如何确定记录是否早于一小时。一种方法是为状态存储中的每条记录添加一个时间戳。
在处理器的init方法中,可以调用schedule(标点符号)来迭代状态存储中的记录并删除旧记录: