kafka流窗口操作在执行偏移重置操作时丢失数据

mm5n2pyu  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(168)

我经历过Kafka流窗口操作可能会丢失数据。让我解释一下我的情况。
我用下面的命令重置了偏移量

  1. /bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --group streamAppId --reset-offsets --by-duration PT37H30M --topic topicNameX --execute

这个命令的结果告诉我偏移量是按35小时30分钟前的时间重置的
然后我重新启动流媒体应用程序。我的窗口持续时间是1秒。。。

  1. WindowBytesStoreSupplier streamStore = Stores.persistentWindowStore(storeName, Duration.ofHours(6), Duration.ofSeconds(1), false);
  2. StringSerializer stringSerializer = new StringSerializer();
  3. StringDeserializer stringDeserializer = new StringDeserializer();
  4. TimeWindowedSerializer<String> windowedSerializer = new TimeWindowedSerializer<>(stringSerializer);
  5. TimeWindowedDeserializer<String> windowedDeserializer = new TimeWindowedDeserializer<>(stringDeserializer, Duration.ofSeconds(1).toMillis());
  6. Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);
  7. shcLogKStream.selectKey((k, v) -> v.getKey())
  8. .groupByKey()
  9. .windowedBy(TimeWindows.of(Duration.ofSeconds(1))).aggregate(ValueObjectType::new, (key, value, aggregate) -> {
  10. aggregate.update(value);
  11. return aggregate;
  12. }, Materialized.<String, ValueObjectType>as(streamStore)
  13. .withKeySerde(Serdes.String())
  14. .withValueSerde(kafkaConfiguration.getValueSerde()))
  15. // Some internal operation ...

在这里你看我没有关Windows grace() 所以我知道它的默认值是24小时。。。
在偏移重置操作后,我重新启动流应用程序,检查前1小时,并与旧结果进行比较,注意结果小于偏移重置操作前。
我是不是在丢失数据?或者您认为我的窗口和聚合配置不正确吗?
编辑附加问题,auto.commit.interval和窗口大小之间有关系吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题