下面是我在kafka流中编写的窗口操作
KTable<Windowed<String>, Test> testWinAlerts = testRecords
.groupByKey()
.aggregate(new TestInitilizer(),
new minMaxCalculator(),
TimeWindows.of(TimeUnit.SECONDS.toMillis(5))
.advanceBy(TimeUnit.SECONDS.toMillis(1)),
MessageSerde,
"win-counts")
.filter((k,v)->{
//Some Operation
return (condition);
})
.toStream((k,v)->k.toString())
.to(Serdes.String(),MessageSerde,"Window-topic");
但在这个操作中,每次新消息出现并更新聚合时,都会调用filter操作。对于每一个新的messege警报都会写入 "Window-topic"
主题。相反,我想要的是,过滤器操作应该在每个窗口执行一次,并将最终结果写入 "Window-topic"
每个窗口一次(5,1)。我们有没有办法做到这一点,减少这些多次通话?
暂无答案!
目前还没有任何答案,快来回答吧!