我经历过Kafka流窗口操作可能会丢失数据。让我解释一下我的情况。
我用下面的命令重置了偏移量
/bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --group streamAppId --reset-offsets --by-duration PT37H30M --topic topicNameX --execute
这个命令的结果告诉我偏移量是按35小时30分钟前的时间重置的
然后我重新启动流媒体应用程序。我的窗口持续时间是1秒。。。
WindowBytesStoreSupplier streamStore = Stores.persistentWindowStore(storeName, Duration.ofHours(6), Duration.ofSeconds(1), false);
StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
TimeWindowedSerializer<String> windowedSerializer = new TimeWindowedSerializer<>(stringSerializer);
TimeWindowedDeserializer<String> windowedDeserializer = new TimeWindowedDeserializer<>(stringDeserializer, Duration.ofSeconds(1).toMillis());
Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);
shcLogKStream.selectKey((k, v) -> v.getKey())
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(1))).aggregate(ValueObjectType::new, (key, value, aggregate) -> {
aggregate.update(value);
return aggregate;
}, Materialized.<String, ValueObjectType>as(streamStore)
.withKeySerde(Serdes.String())
.withValueSerde(kafkaConfiguration.getValueSerde()))
// Some internal operation ...
在这里你看我没有关Windows grace()
所以我知道它的默认值是24小时。。。
在偏移重置操作后,我重新启动流应用程序,检查前1小时,并与旧结果进行比较,注意结果小于偏移重置操作前。
我是不是在丢失数据?或者您认为我的窗口和聚合配置不正确吗?
编辑附加问题,auto.commit.interval和窗口大小之间有关系吗?
暂无答案!
目前还没有任何答案,快来回答吧!