这是一个样本 KTable
我构建了一个简单的聚合:
String name = stream
.groupByKey()
.aggregate(
() -> new Aggregate(config),
(key, value, aggregate) -> aggregate.addAndReturn(value),
Materialized
.<String, Aggregate>as(Stores.inMemoryKeyValueStore(config.OutputStore()))
.withCachingEnabled()
.withKeySerde(Serdes.String())
.withValueSerde(CustomSerdes.ObjectSerde()))
.filter(((key, value) -> value.isStateChanged()))
.filter((key, value) -> !value.getRecentlyViewed().isEmpty())
.queryableStoreName();
我需要做的是保存最终的 KTable
(应用筛选后)在状态存储中,而不是在初始 KTable
. 目前 KTable.queryableStoreName()
退货 null
.
我目前的解决办法是 filter()
,然后使用 KTable.toStream()
最后存储为 KTable
我认为这也是低效的。还有别的解决办法吗
1条答案
按热度按时间lstz6jyr1#
您可以强制实现
KTable
通过提供可查询的存储名称:根据所使用的版本,可能需要指定一些泛型以使其可编译: