apache flink rocksdb状态管理

sg24os4d  于 2021-07-15  发布在  Flink
关注(0)|答案(1)|浏览(479)

我在同一个flink jobs里读了两个Kafka主题。 Stream1 :来自第一个主题的消息保存到rocksdb,然后它将与stream2合并。 Stream2 :来自第二个主题的消息使用stream1保存的状态进行丰富,然后它将与stream1合并。
主题1和主题2是不同的来源,但基本上两个来源的输出是相同的。我只需要用topic1的数据来丰富topic2的数据。
这里是流动;

val stream1 = readKafkaTopic1().keyBy(_.memberId).map(saveMemberDetailsToRocksDB)
val stream2 = readKafkaTopic2().keyBy(_.memberId).map(readMemberDetailsAndEnrich)
stream1.union(stream2).addSink(kafkaProducer)

问题来了;
这个流程好吗?
可以 stream2 访问保存的状态 stream1 同样的道理 memberId ?

v440hwme

v440hwme1#

似乎你应该能够通过使用 KeyedCoProcessFunction . 这或多或少是这样的:

stream1
.keyBy(_.memberId)
.connect(stream2.keyBy(_.memberId))
.process(new CustomKeyedCoProcessFunction())

这样你就可以把州保持在单一的状态 KeyedCoProcessFunction ,因此您可以同时访问 stream1 以及 stream2 .
所以,为了 processElement1 你可以在里面做同样的事情 map 为了 stream1 而且在 processElement2 你可以在里面做同样的事情 map 对于stream2。

相关问题