我想做的是:
使用数字主题中的记录(长)
聚合(计数)每5秒窗口的值
将最终聚合结果发送到另一个主题
我的代码如下所示:
KStream<String, Long> longs = builder.stream(
Serdes.String(), Serdes.Long(), "longs");
// In one ktable, count by key, on a five second tumbling window.
KTable<Windowed<String>, Long> longCounts =
longs.countByKey(TimeWindows.of("longCounts", 5000L));
// Finally, sink to the long-avgs topic.
longCounts.toStream((wk, v) -> wk.key())
.to("long-counts");
看起来一切都像预期的那样工作,但是聚合被发送到每个传入记录的目标主题。我的问题是如何只发送每个窗口的最终聚合结果?
3条答案
按热度按时间agyaoht71#
在Kafka流中,没有所谓的“最终聚集”。窗口始终保持打开状态,以处理在窗口结束时间过后到达的无序记录。然而,Windows并不是永远保存的。一旦它们的保留时间过期,它们就会被丢弃。对于窗口何时被丢弃,没有特殊的操作。
有关更多详细信息,请参阅合流文档:http://docs.confluent.io/current/streams/
因此,对于聚合的每次更新,都会生成一个结果记录(因为kafka流也会更新无序记录的聚合结果)。您的“最终结果”将是最新的结果记录(在窗口被丢弃之前)。根据您的用例,手动重复数据消除将是解决问题的一种方法(使用较低级别的api,
transform()
或者process()
)这篇博文可能也会有所帮助:https://timothyrenner.github.io/engineering/2016/08/11/kafka-streams-not-looking-at-facebook.html
另一篇不使用标点符号的博客文章:http://blog.inovatrend.com/2018/03/making-of-message-gateway-with-kafka.html
更新
使用kip-328,a
KTable#suppress()
添加了操作符,允许严格抑制连续更新,并在每个窗口中发出单个结果记录;折衷的办法是增加延迟。5fjcxozz2#
我面临这个问题,但我解决了这个问题,在固定窗口之后添加grace(0),并使用抑制的api
在这里你可以看到结果
b09cbbtk3#
从kafka streams 2.1版开始,您可以使用
suppress
.上述apache kafka streams文档中有一个示例,当用户一小时内发生的事件少于三个时,它会发送警报:
正如这个答案的更新中所提到的,您应该知道这种权衡。此外,请注意suppress()是基于事件时间的。