Kafka流的配置:
threads = 1;
replicationFactor = 1;
ktableCommitInterval= 10000;
ktableMemory=72000000;
timeDuration=10;
拓扑结构:
KStream<Windowed<String>,String> windowedStringKStream =
streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(),Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(),Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(timeDuration)).grace(Duration.ofSeconds(0)))
.reduce(Numners::append,Materialized.<String, String, WindowStore<Bytes,byte[]>>as(storeName).withCachingEnabled().withRetention(Duration.ofSeconds(timeDuration)).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
.toStream();
代码说明:
Code appends numbers in a 10 Second window. Incremental Number records are sent exactly at a interval of 1 second into input topic.
输出:
问题:
提交间隔设置为10秒。缓存大小设置为72 mb。数据以字节为单位。状态存储已启用缓存。文档指出,将数据推送到下游的kafka流的操作语义取决于缓存大小或提交间隔(不管先发生什么)。但是根据实验,提交在一分钟内发生了两次。观察结果是,提交间隔在应用程序启动时开始,而窗口在数据开始出现时开始。如图所示,中间窗口结果被推送,最终窗口结果也被推送。
对于我正在处理的用例,使用suppress()是不可能的,因为如果没有新数据出现在主题上,它将不会刷新数据。
任何帮助都将不胜感激。如果有人正面临这个问题,或者想要复制这个,请告诉我。
1条答案
按热度按时间q8l4jmvw1#
唯一的解决方案是建立一个自定义的
transform()
它实现了一个自定义版本的“suppress”,允许您在没有新的输入数据到达时发出数据(例如,挂钟时间标点可能有助于实现它)。目前(从ApacheKafka2.4版本开始),没有内置的支持。如果你不使用
suppress()
窗口聚合可能总是在窗口实际关闭之前发出一些中间结果,因此无法对kafka流进行不同的配置。