queryble过滤ktable

5sxhfpxr  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(323)

这是一个样本 KTable 我构建了一个简单的聚合:

String name = stream
    .groupByKey()
    .aggregate(
        () -> new Aggregate(config),
        (key, value, aggregate) -> aggregate.addAndReturn(value),
        Materialized
            .<String, Aggregate>as(Stores.inMemoryKeyValueStore(config.OutputStore()))
            .withCachingEnabled()
            .withKeySerde(Serdes.String())
            .withValueSerde(CustomSerdes.ObjectSerde()))
    .filter(((key, value) -> value.isStateChanged()))
    .filter((key, value) -> !value.getRecentlyViewed().isEmpty())
    .queryableStoreName();

我需要做的是保存最终的 KTable (应用筛选后)在状态存储中,而不是在初始 KTable . 目前 KTable.queryableStoreName() 退货 null .
我目前的解决办法是 filter() ,然后使用 KTable.toStream() 最后存储为 KTable 我认为这也是低效的。还有别的解决办法吗

lstz6jyr

lstz6jyr1#

您可以强制实现 KTable 通过提供可查询的存储名称:

.aggregate()
.filter(..., Materialized.as("your-custom-store-name"));

根据所使用的版本,可能需要指定一些泛型以使其可编译:

Materialized<KEY_TYPE, VALUE_TYPE, KeyValueStore<Bytes, byte[]>>.as("your-custom-store-name"))

相关问题