我希望有人能帮我解决一个关于Kafkaglobalktables的问题。
我正在尝试执行kstream globalktable联接。但是,我希望检索globalktable中的所有条目,这些条目的键或值包含在原始流事件中找到的字符串。例如,假设我的表有3行,其中包含以下键:
Key: BANK055DEPOSIT value: {some data}
Key: BANK055CREDIT value: {different data}
Key: BANK033CREDIT value: {more different data}
当我对表执行连接以检索数据时,我需要收回其键或值包含“055”的所有行。所以我想要前两排。
在数据库世界中,这相当于以下内容:
SELECT * FROM GlobalKTable where table_key.contains("055”) OR table_value.contains(“055”)
我浏览了官方文件,没有找到如何做到这一点的例子。我怀疑从globalktable联接中检索n个行是不可能的。
另外,我使用streams dsl来实现这个。不确定使用处理器api是否可以实现这一点。任何意见都将不胜感激!
1条答案
按热度按时间cpjpxq1n1#
当加入
KStream
用一个GlobalKTable
,可以使用部分键和值KStream,
但最终必须与整个GlobalKTable
所以,不幸的是,你不能像上面所说的那样使用join。但是,即使使用dsl,您也应该能够做到这一点。如果你用过
KStream.transformValues
用一个ValueTransformerWithKeySupplier
,您可以扫描statestore并根据流记录中包含的子字符串提取所需的记录。此外,您不一定需要扫描整个存储,而是使用范围查询。编辑:这里有一些代码,我得到的工作,以证明我得到了什么。
嗯,比尔