我有一个案例,我必须针对从这些相同值生成的存储查询每个流值。
final KStream<String, CustomDto> stream = builder.stream(INPUT_TOPIC_NAME);
final KTable<Windowed<CustomKey>, Long> table = stream
.map((key, value) -> mapFunction(value)) // here we are getting our CustomKey and Long value
.groupByKey(Grouped.with(new CustomKeySerde(), Serdes.Long()))
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(300)))
.aggregate(
() -> 0L,
(aggKey, newValue, aggValue) -> aggValue + newValue,
Materialized.as(STORE_NAME).with(new CustomKeySerde(), Serdes.Long()));
我需要能够查询通过输入主题传递的每个CustomDto对象的结果存储。我试着在mapFunction内部执行此操作,但似乎存储在那里不可用,即使我事先使用builder.addStateStore
添加它。我该如何处理此问题?我阅读了文档,但没有找到任何对我的情况有帮助的内容。
1条答案
按热度按时间cnjp1d6j1#
您将需要使用处理器API来访问状态存储。
https://kafka.apache.org/33/documentation/streams/developer-guide/processor-api.html