注意:这是一个无效的问题。请忽略。
我有一个ksa监听两个主题(两个子拓扑),一个(子拓扑a)写入状态存储,另一个(子拓扑b)读取状态存储。
写
...
stream
.mapValues(v -> new Version(v.getHeader().getOccurredAt().getSeconds(), v.getVersion().getValue()))
.groupByKey()
.aggregate(
() -> new Version(0,0),
(aggKey, newValue, aggValue) -> aggValue.getTimestamp() > newValue.getTimestamp() ? aggValue : newValue,
Materialized.<String, AgentVersion, KeyValueStore<Bytes, byte[]>>as(conf.versionStoreName())
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.serdeFrom(
new Version.Serializer(), new Version.Deserializer())));
阅读
ReadOnlyKeyValueStore<String, Version> getVersionStore() {
return app().store(conf.versionStoreName(), QueryableStoreTypes.keyValueStore());
}
但是,我发现b不能得到a写的数据(a可以正确地得到日期)。
我错过什么了吗?
1条答案
按热度按时间afdcj2ne1#
通过设计,子拓扑相互隔离。如果您想从一个子拓扑访问另一个子拓扑中的存储,则需要将两个子拓扑连接到一个子拓扑中。例如,将相应的存储添加到处理器/转换器。
查阅https://docs.confluent.io/current/streams/architecture.html#stream-分区和任务