我在同一个flink jobs里读了两个Kafka主题。 Stream1
:来自第一个主题的消息保存到rocksdb,然后它将与stream2合并。 Stream2
:来自第二个主题的消息使用stream1保存的状态进行丰富,然后它将与stream1合并。
主题1和主题2是不同的来源,但基本上两个来源的输出是相同的。我只需要用topic1的数据来丰富topic2的数据。
这里是流动;
val stream1 = readKafkaTopic1().keyBy(_.memberId).map(saveMemberDetailsToRocksDB)
val stream2 = readKafkaTopic2().keyBy(_.memberId).map(readMemberDetailsAndEnrich)
stream1.union(stream2).addSink(kafkaProducer)
问题来了;
这个流程好吗?
可以 stream2
访问保存的状态 stream1
同样的道理 memberId
?
1条答案
按热度按时间v440hwme1#
似乎你应该能够通过使用
KeyedCoProcessFunction
. 这或多或少是这样的:这样你就可以把州保持在单一的状态
KeyedCoProcessFunction
,因此您可以同时访问stream1
以及stream2
.所以,为了
processElement1
你可以在里面做同样的事情map
为了stream1
而且在processElement2
你可以在里面做同样的事情map
对于stream2。