Kafka流表补丁日志未满后

rbl8hiat  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(357)

所需功能:对于给定的密钥key123,许多服务并行运行,并将其结果报告给单个位置,一旦为key123收集了所有结果,它们就会传递给新的下游使用者。
最初的想法:使用awsdynamiodb保存给定条目的所有结果。每次结果准备就绪时,一个微服务都会对key123上的数据库执行修补操作。输出流检查每个更新以查看条目是否完整,如果完整,则将其转发到下游。
新想法:使用kafka流和ksql来达到相同的目标。所有服务都将其输出写入results主题,该主题形成一个更改日志kstream,我们可以用ksql查询完成的条目。比如:

CREATE STREAM competed_results FROM results_stream SELECT * WHERE (all results != NULL).

我不知道该怎么做的部分是流上的补丁操作。要让输出流显示key123的所有消息的累积,而不是只显示最近的消息?
ksql用户,这有意义吗?我是否接近某个人以前做过的解决方案?

jm2pwxwz

jm2pwxwz1#

如果可以使用密钥集为同一主题生成所有事件,则可以使用ksqldb中的聚合来收集特定密钥的所有事件,例如:

CREATE STREAM source (
    KEY INT KEY,  -- example key to group by
    EVENT STRING  -- example event to collect
  ) WITH (
   kafka_topic='source', -- or whatever your source topic is called.
   value_format='json' -- or whatever value format you need.
);

CREATE TABLE agg AS
  SELECT 
    key, 
    COLLECT_LIST(event) as events
  FROM source
  GROUP BY key;

这将创建一个名为 AGG 默认情况下。当接收到源主题上特定键的新事件时,ksqldb将向 AGG 主题,并将键设置为 key 以及包含所有 events 找到那把钥匙。
然后可以将此变更日志作为流导入:

CREATE STREAM agg_stream (
   KEY INT KEY,
   EVENTS ARRAY<STRING> 
) WITH (
   kafka_topic='AGG',
   value_format='json'
);

然后,您可以应用一些条件来过滤流,以便只包含最终结果:

STREAM competed_results AS 
  SELECT 
    * 
  FROM agg_stream 
  WHERE ARRAY_LEN(EVENTS) = 5; -- example 'complete' criteria.

您甚至可以使用用户定义的函数来定义完整的条件:

STREAM competed_results AS 
  SELECT 
    * 
  FROM agg_stream 
  WHERE IS_COMPLETE(EVENTS);

相关问题