我想加入 KStream
与 GlobalKTable
按键,但有特定的逻辑。
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Integer> stream = builder.stream(inputTopic1); // key = "ABC"
GlobalKTable<String, Integer> table = builder.globalTable(inputTopic2); // key = "ABC"
stream.join(table, // join first by "ABC" = "ABC", then by "AB" = "AB", then by "A" = "A"
(key, value) -> key,
(valueLeft, valueRigth) -> {/* identify by which condition the join was performed */});
例如,如果键=“”,则:
首先,按完整键连接-即“”=“”
然后,如果未连接,则通过前两个符号连接(删除一个符号)-即“ab”=“ab”
最后,试着只用一个符号来连接-即“a”=“a”
此外,还需要知道执行连接的条件-例如,3个字母/2个字母/1个字母。
问题是,这是可能的还是我应该寻找一个解决办法?例如,用相应的键复制globalktable(表带有“”键,一个带有“ab”键,一个带有“a”键)并执行3个单独的联接?或者其他建议?
提前谢谢!
1条答案
按热度按时间nwnhqdif1#
对多个表使用一系列左联接是可能的(如果您知道经常要尝试这种联接)。如果连接成功,则跳过下一个连接。结合使用
leftJoin()
以及branch()
应该允许您在每次联接之后将流拆分为“联接”和“重试”。最后,你可以merge()
不同的结果流在一起,如果你想。