背景
在《设计事件驱动系统》一书中,作者重点介绍了业务系统中需要等待或等待n个事件发生的一个常见用例。
给出的示例是一个orders服务,它需要等待三个单独的验证服务返回一个 PASS
. (我认为这意味着在同一主题上会有三条具有相同键的验证消息,每条消息都有一个表示成功或失败的值。)
作者指出,解决方案将采用以下形式(假设计数基于密钥):
按键分组。
计算每个键的出现次数(使用与窗口一起执行的聚合器)。
过滤所需计数的输出。
问题
上面的每一步具体是如何工作的,所涉及的类/方法是什么?
特别是,第一步(按键分组)是否涉及使用kstream::groupbykey方法?
如果是这样,那么输出将是一个kgroupedstream,第二步可能会使用count方法之一并返回一个ktable。
但是,有了一个ktable,我们如何按照步骤3过滤ktable上的输出?
1条答案
按热度按时间ujv3wf0j1#
我想你的假设是对的。对于步骤(2),这取决于,但是如果您假设您等待的所有消息都具有相同的密钥,并且您只在收到三条具有相同密钥的消息时才感兴趣,则调用
count()
是你想要的。作为下一步(即,(3)),您将调用
KTable#filter()
获取计数为3的所有行。最后,你可以打电话
toStream()
每次一个键达到3的计数时,这个流应该包含一个记录。(附言:所有参赛作品将留在
KTable
默认情况下是永久的,因此您还需要注意删除在某个时刻达到计数3的条目。)