我有两个Kafka主题包含不同的信息到“警告事件”。为了知道主题A和B中的哪些条目相互对应,我必须比较序列,日期和机器形式的密钥,然后加入“否”
重要提示:row_number可以不同(参见下面的示例)
主题A包含如下值:
关键字:{"serial":187,"date":"11/16/2022","row":0,"machine":"Blue"}
值:{ "no": 1, "frequency": 0 }
主题B包含如下值:
关键字:{"serial":187,"date":"11/16/2022","row":99,"machine":"Blue"}
值:{ "warning": "Emergency", "no": 1 }
我想要的输出主题C通过“no”组合了这两个信息:
关键字:{"serial":187,"date":"11/16/2022","row":0,"machine":"Blue"}
值:{ "no": 1, "frequency": 0, "warning": "Emergency!" }
它应该基本上是主题A(具有相同的键,但添加了来自主题B的警告名称(明文))。
我对Kafka完全是个菜鸟,所以我在这个问题上已经挣扎了很长时间。
我尝试使用KStream.innerJoints,但是当键的一部分(row_number)不一定相同时,很难得到连接。
1条答案
按热度按时间w8f9ii691#
要进行正确的连接,您需要在连接之前修改键,例如通过
selectKey
或map
删除行号。如果输出中不需要行号,最简单的方法是使用selecKey
并删除它。如果需要输出中的行号,则需要使用map,并将其从键移到值中(可以在连接后使用另一个map
将其从值放回键中)。