kstream窗口聚合

9wbgstp7  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(539)

尝试合并多个Kafka流,聚合并产生一个新的主题。但是,在同一个窗口中,代码生成的聚合记录数量与每个输入流中的总输入记录数量相同。我希望聚合只在连接窗口的末尾生成1个输出。在下面的代码中我做错了什么-

val streams = requestStreams.merge(successStreams).merge(errorStreams)
                .groupByKey(Grouped.with(Serdes.String(), serdesConfig.notificationSerde()))
                .windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
                .aggregate({ null }, StreamAggregators.notificationMetricAggregator, Materialized.`as`<String, NotificationMetric, WindowStore<Bytes, ByteArray>>("ag-store")
                        .withValueSerde(serdesConfig.notificationMetricSerde()))
                .toStream()

streams.to(notificationStreamsConfig.metricsTopic, Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String::class.java, 10), serdesConfig.notificationMetricSerde()))
hi3rlvi2

hi3rlvi21#

默认情况下,kafka streams使用连续更新处理模型。注意,聚合的结果是 KTable . 此结果表包含每个窗口的一行,每次处理新记录时,都会更新窗口(即表中的行)。
如果你打电话 KTable#toStream() 您将获得表的changelog流,其中包含表的每次更新的记录。
如果希望每个窗口只获得一个结果,可以使用 suppress() 接线员请稍等 KTable ,即, suppress() 拿第一个 KTable s changelog流,并等待窗口关闭,只将最终结果插入到其输出中 KTable . 如果你使用 suppress() ,则应将上游窗口聚合的宽限期(默认值为24小时)设置为较低的值,即, TimeWindows.of(...).grace(...) .
有关更多详细信息,请查看以下博客:https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers

相关问题