spring和kafka:加入3个kafka主题以生成输出kafka流

cu6pst1q  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(473)

我需要加入3个Kafka主题。前两个主题a和b将使用inner join添加,因为消息键相同,并生成一个pojo与b相同的新kafka流。现在有了这个累积的流,我需要加入另一个主题c,我需要根据一个字段对输出进行分组,这个字段在c中。
到目前为止,我有以下方法:
kstream-前两个主题(a和b)的kstream内部连接是否可以不在任何主题上发布这个累积输出,并且仍然可以在下面使用它
kstream-kstream(在累积流和主题c上方)
你能推荐一个更好的方法或者任何我可以在java中看到的类似实现的例子吗。

ca1c2owp

ca1c2owp1#

可以使用两个连续联接:

  1. KStream streamAB = streamA.join(streamB, ...);
  2. // either:
  3. KStream streamABC = streamA.selectKey(...) // set to the key as in streamC
  4. .join(streamC, ...);
  5. // or:
  6. KStream streamCNew = streamC.selectKey(...); // set to the key as in streamAB
  7. KStream streamABC = streamA.join(streamCnew, ...);
  8. // or:
  9. KStream streamCNew = streamC.selectKey(...); // set to a new join key
  10. KStream streamABC = streamA.selectKey(...) // set to a new join key
  11. .join(streamC, ...);
  12. streamABC.selectKey(/* extract grouping field and set as key */).to("outputTopic");

相关问题