尝试合并多个Kafka流,聚合并产生一个新的主题。但是,在同一个窗口中,代码生成的聚合记录数量与每个输入流中的总输入记录数量相同。我希望聚合只在连接窗口的末尾生成1个输出。在下面的代码中我做错了什么-
val streams = requestStreams.merge(successStreams).merge(errorStreams)
.groupByKey(Grouped.with(Serdes.String(), serdesConfig.notificationSerde()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
.aggregate({ null }, StreamAggregators.notificationMetricAggregator, Materialized.`as`<String, NotificationMetric, WindowStore<Bytes, ByteArray>>("ag-store")
.withValueSerde(serdesConfig.notificationMetricSerde()))
.toStream()
streams.to(notificationStreamsConfig.metricsTopic, Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String::class.java, 10), serdesConfig.notificationMetricSerde()))
1条答案
按热度按时间hi3rlvi21#
默认情况下,kafka streams使用连续更新处理模型。注意,聚合的结果是
KTable
. 此结果表包含每个窗口的一行,每次处理新记录时,都会更新窗口(即表中的行)。如果你打电话
KTable#toStream()
您将获得表的changelog流,其中包含表的每次更新的记录。如果希望每个窗口只获得一个结果,可以使用
suppress()
接线员请稍等KTable
,即,suppress()
拿第一个KTable
s changelog流,并等待窗口关闭,只将最终结果插入到其输出中KTable
. 如果你使用suppress()
,则应将上游窗口聚合的宽限期(默认值为24小时)设置为较低的值,即,TimeWindows.of(...).grace(...)
.有关更多详细信息,请查看以下博客:https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers