压缩chagelog主题是否为kafka流中的每个窗口包含一个键?

cwdobuhd  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(321)

据我所知,窗口聚合的更改日志主题应该为每个窗口至少包含一个键/值?

input
  .groupByKey() // group by user
  .windowedBy(
    TimeWindows
      .of(Duration.ofSeconds(60))
      .advanceBy(Duration.ofSeconds(10))
      .grace(Duration.ofSeconds(60)))
  .aggregate(
    () -> new Aggregate(config),
    (userId, msg, aggregate) -> aggregate.addAndReturn(msg),
    Materialized
      .<String, Aggregate>as(inMemoryWindowStore(
        config.getOutputStore(),
        Duration.ofSeconds(300),
        Duration.ofSeconds(60),
        false))
      .withCachingDisabled()
      .withKeySerde(Serdes.String())
      .withValueSerde(new MyCustomSerde()));

当我查询状态存储时,我希望为每个窗口获得一个键/值:

WindowStoreIterator<Aggregate> iter = store.fetch(userId, start, end)

但是要么我什么也得不到(迭代器是空的),要么有时它小于开始到结束之间的实际窗口数。

z4bn682m

z4bn682m1#

使用的参数 store.fetch(key, startTs, endTs) 不正确。两个时间戳 startTs 以及 endTs 不要引用单个窗口的开始/结束时间戳,但它是一个时间范围: fetch() 将返回时间范围中包含开始时间戳的所有窗口。
旧版本中的javadocs不太好,可能会失去引导作用。新版本改进了javadocs:https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/state/readonlywindowstore.html
请注意,参数类型已更改,并在较新版本中重命名:

WindowStoreIterator<V> fetch(K key,
                             Instant from,
                             Instant to)

Get all the key-value pairs with the given key and the time range from all the existing windows.
This iterator must be closed after use.

The time range is inclusive and applies to the starting timestamp of the window.

相关问题