在kafka流中加入外键

np8igboo  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(201)

假设我有三个kafka主题,其中充满了表示不同聚合中发生的业务事件的事件(事件源应用程序)。这些事件允许构建具有以下属性的聚合:
用户:usedid,name
应用程序的模块:moduleid,name
应用程序模块的用户授权:grantid、userid、moduleid、scope
现在我想创建一个包含所有授权的流,其中包含用户和产品的名称(而不是id)。我想这样做:
通过按userid分组事件,为用户创建ktable。ktable以userid作为键。没关系。
通过按productid对事件进行分组来创建产品的ktable。ktable的productid是键。没关系。
从grant流创建一个流,并连接两个ktable。这是不行的。问题是连接似乎只能在主键上进行。但是流的密钥是grant的技术标识符,而用户和产品表的密钥不是(它们与grant无关)。
那么如何进行呢?

rpppsulh

rpppsulh1#

此功能作为kafka streams 2.4.0的一部分发布。
下面是有关使用此功能的官方教程。

lh80um4z

lh80um4z2#

嗯,目前在kafka流中没有外键连接的直接支持。
有一个开放的基普:https://issues.apache.org/jira/browse/kafka-3705 同样的道理。
目前,可以有一个解决这个问题的方法。您可以使用kstream ktable join。
首先用聚合的事件集合将用户流和模块流聚合到各自的ktable中。

KTable<String,Object> UserTable = userStream.groupBy(<UserId>).aggregate(<... build collection/latest event>) ;
KTable<String,Object> ModuleTable = moduleStream.groupBy(<ModuleId>).aggregate(<... build collection/latest event>);

现在选择moduleid作为授权流中的一个键。

KStream<String,Object> grantRekeyedStream = grantStream.selectKey(<moduleId>);

它将把键改为moduleid。现在可以使用moduletable执行流表连接。它将从右边为左边的键连接所有匹配的记录。结果流将把grant和module数据放入一个以moduleid为键的流中。

KStream<String,Object> grantModuleStream = grantRekeyedStream.join(moduleTable);

下一步是加入usertable。因此,需要使用userid重新为grantmoduletable设置密钥。

KStream<String,Object> grantModuleRekeyedStream = grantModuleTable.selectKey(<Select UserId>);

现在grantmodulerekeyedstream可以通过kstream ktable join与usertable连接

KStream<String,Object> grantModuleUserStream = grantModuleRekeyedStream .join(userTable);

上面的流将用户id作为密钥,并包含该用户的所有授权和模块详细信息。

相关问题