据我所知,窗口聚合的更改日志主题应该为每个窗口至少包含一个键/值?
input
.groupByKey() // group by user
.windowedBy(
TimeWindows
.of(Duration.ofSeconds(60))
.advanceBy(Duration.ofSeconds(10))
.grace(Duration.ofSeconds(60)))
.aggregate(
() -> new Aggregate(config),
(userId, msg, aggregate) -> aggregate.addAndReturn(msg),
Materialized
.<String, Aggregate>as(inMemoryWindowStore(
config.getOutputStore(),
Duration.ofSeconds(300),
Duration.ofSeconds(60),
false))
.withCachingDisabled()
.withKeySerde(Serdes.String())
.withValueSerde(new MyCustomSerde()));
当我查询状态存储时,我希望为每个窗口获得一个键/值:
WindowStoreIterator<Aggregate> iter = store.fetch(userId, start, end)
但是要么我什么也得不到(迭代器是空的),要么有时它小于开始到结束之间的实际窗口数。
1条答案
按热度按时间z4bn682m1#
使用的参数
store.fetch(key, startTs, endTs)
不正确。两个时间戳startTs
以及endTs
不要引用单个窗口的开始/结束时间戳,但它是一个时间范围:fetch()
将返回时间范围中包含开始时间戳的所有窗口。旧版本中的javadocs不太好,可能会失去引导作用。新版本改进了javadocs:https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/state/readonlywindowstore.html
请注意,参数类型已更改,并在较新版本中重命名: