无法在kafka流应用程序的子拓扑之间共享状态存储中的数据

oknwwptz  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(647)

注意:这是一个无效的问题。请忽略。
我有一个ksa监听两个主题(两个子拓扑),一个(子拓扑a)写入状态存储,另一个(子拓扑b)读取状态存储。

...
  stream
      .mapValues(v -> new Version(v.getHeader().getOccurredAt().getSeconds(), v.getVersion().getValue()))
      .groupByKey()
      .aggregate(
        () -> new Version(0,0),
        (aggKey, newValue, aggValue) -> aggValue.getTimestamp() > newValue.getTimestamp() ? aggValue : newValue,
      Materialized.<String, AgentVersion, KeyValueStore<Bytes, byte[]>>as(conf.versionStoreName())
        .withKeySerde(Serdes.String())
        .withValueSerde(Serdes.serdeFrom(
          new Version.Serializer(), new Version.Deserializer())));

阅读

ReadOnlyKeyValueStore<String, Version> getVersionStore() {
  return app().store(conf.versionStoreName(), QueryableStoreTypes.keyValueStore());
}

但是,我发现b不能得到a写的数据(a可以正确地得到日期)。
我错过什么了吗?

afdcj2ne

afdcj2ne1#

通过设计,子拓扑相互隔离。如果您想从一个子拓扑访问另一个子拓扑中的存储,则需要将两个子拓扑连接到一个子拓扑中。例如,将相应的存储添加到处理器/转换器。
查阅https://docs.confluent.io/current/streams/architecture.html#stream-分区和任务

相关问题