apache-kafka 是否可以在Kafka Streams中查询同一拓扑中的聚合存储?

afdcj2ne  于 2022-11-01  发布在  Apache
关注(0)|答案(1)|浏览(113)

我有一个案例,我必须针对从这些相同值生成的存储查询每个流值。

final KStream<String, CustomDto> stream = builder.stream(INPUT_TOPIC_NAME);
final KTable<Windowed<CustomKey>, Long> table = stream
    .map((key, value) -> mapFunction(value)) // here we are getting our CustomKey and Long value
    .groupByKey(Grouped.with(new CustomKeySerde(), Serdes.Long()))
    .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(300)))
    .aggregate(
        () -> 0L,
        (aggKey, newValue, aggValue) -> aggValue + newValue,
        Materialized.as(STORE_NAME).with(new CustomKeySerde(), Serdes.Long()));

我需要能够查询通过输入主题传递的每个CustomDto对象的结果存储。我试着在mapFunction内部执行此操作,但似乎存储在那里不可用,即使我事先使用builder.addStateStore添加它。我该如何处理此问题?我阅读了文档,但没有找到任何对我的情况有帮助的内容。

相关问题