java—如何使用标点符号从状态存储中删除旧记录(Kafka)

hts6caw3  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(370)

我创造了一个 Ktable 主题使用 streamsBuilder.table("myTopic") ,我将其具体化为状态存储,以便使用交互式查询。
每小时,我都要从这个状态存储(以及关联的changelog主题)中删除其值在过去一小时内没有更新的记录。
我相信使用标点符号可以做到这一点,但是到目前为止我只使用了dsl,所以不确定如何继续。如果有人能给我举个例子,我将不胜感激。
谢谢,
杰克

wnrlj8wa

wnrlj8wa1#

可以将处理器api与dsl混合并匹配,但不能处理ktable。您需要转换为kstream。或者,您可以使用与状态存储交互的处理器创建一个新拓扑。
您需要将该状态存储在某个位置—如何确定记录是否早于一小时。一种方法是为状态存储中的每条记录添加一个时间戳。
在处理器的init方法中,可以调用schedule(标点符号)来迭代状态存储中的记录并删除旧记录:

context.schedule(Duration.ofMillis(everyHourInMillis), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
    myStateStore.all().forEachRemaining(keyValue -> {
        if (Instant.ofEpochMilli(valueInStateStore).compareTo(olderThanAnHour) < 0) {
            myStateStore.delete(keyValue.key);
        }
    });
});

相关问题