kafka流:将中间窗口结果刷新为提交间隔和窗口时间不同步

iibxawm4  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(394)

Kafka流的配置:

threads = 1;
replicationFactor = 1;
ktableCommitInterval= 10000;
ktableMemory=72000000;
timeDuration=10;

拓扑结构:

KStream<Windowed<String>,String> windowedStringKStream =
                         streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(),Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(),Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(timeDuration)).grace(Duration.ofSeconds(0)))
                        .reduce(Numners::append,Materialized.<String, String, WindowStore<Bytes,byte[]>>as(storeName).withCachingEnabled().withRetention(Duration.ofSeconds(timeDuration)).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
.toStream();

代码说明:

Code appends numbers in a 10 Second window. Incremental Number records are sent exactly at a interval of 1 second into input topic.

输出:

问题:
提交间隔设置为10秒。缓存大小设置为72 mb。数据以字节为单位。状态存储已启用缓存。文档指出,将数据推送到下游的kafka流的操作语义取决于缓存大小或提交间隔(不管先发生什么)。但是根据实验,提交在一分钟内发生了两次。观察结果是,提交间隔在应用程序启动时开始,而窗口在数据开始出现时开始。如图所示,中间窗口结果被推送,最终窗口结果也被推送。
对于我正在处理的用例,使用suppress()是不可能的,因为如果没有新数据出现在主题上,它将不会刷新数据。
任何帮助都将不胜感激。如果有人正面临这个问题,或者想要复制这个,请告诉我。

q8l4jmvw

q8l4jmvw1#

唯一的解决方案是建立一个自定义的 transform() 它实现了一个自定义版本的“suppress”,允许您在没有新的输入数据到达时发出数据(例如,挂钟时间标点可能有助于实现它)。
目前(从ApacheKafka2.4版本开始),没有内置的支持。如果你不使用 suppress() 窗口聚合可能总是在窗口实际关闭之前发出一些中间结果,因此无法对kafka流进行不同的配置。

相关问题