所需功能:对于给定的密钥key123,许多服务并行运行,并将其结果报告给单个位置,一旦为key123收集了所有结果,它们就会传递给新的下游使用者。
最初的想法:使用awsdynamiodb保存给定条目的所有结果。每次结果准备就绪时,一个微服务都会对key123上的数据库执行修补操作。输出流检查每个更新以查看条目是否完整,如果完整,则将其转发到下游。
新想法:使用kafka流和ksql来达到相同的目标。所有服务都将其输出写入results主题,该主题形成一个更改日志kstream,我们可以用ksql查询完成的条目。比如:
CREATE STREAM competed_results FROM results_stream SELECT * WHERE (all results != NULL).
我不知道该怎么做的部分是流上的补丁操作。要让输出流显示key123的所有消息的累积,而不是只显示最近的消息?
ksql用户,这有意义吗?我是否接近某个人以前做过的解决方案?
1条答案
按热度按时间jm2pwxwz1#
如果可以使用密钥集为同一主题生成所有事件,则可以使用ksqldb中的聚合来收集特定密钥的所有事件,例如:
这将创建一个名为
AGG
默认情况下。当接收到源主题上特定键的新事件时,ksqldb将向AGG
主题,并将键设置为key
以及包含所有events
找到那把钥匙。然后可以将此变更日志作为流导入:
然后,您可以应用一些条件来过滤流,以便只包含最终结果:
您甚至可以使用用户定义的函数来定义完整的条件: