flink流:由控制流控制的数据流

kuuvgm7e  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(444)

我有一个问题是这个问题的一个变体:flink:如何存储状态并在另一个流中使用?
我有两条流:
val ipStream: DataStream[IPAddress] = ??? val routeStream: DataStream[RoutingTable] = ??? 我想知道哪个包裹使用哪条路线。通常可以通过以下方法完成:

val ip = IPAddress("10.10.10.10")
val table = RoutingTable(Seq("10.10.10.0/24", "5.5.5.0/24"))
val route = table.lookup(ip) // == "10.10.10.0/24"

这里的问题是,我不能真正在这里为流设置密钥,因为这既需要完整的表,也需要ip地址(并且密钥必须是独立计算的)。
对于来自 ipStream ,我需要最新的 routeStream 元素。现在我使用的是一种非并行处理的方法:

ipStream
  .connect(routeStream)
  .keyBy(_ => 0, _ => 0)
  .flatMap(new MyRichCoFlatMapFunction) // with ValueState[RoutingTable]

这听起来像是广播策略的用例。但是,routestream将被更新,并且不会在文件中修复。问题仍然存在:有没有一种方法可以拥有两个流,其中一个流包含另一个流的更改控制数据?

huwehgph

huwehgph1#

既然我解决了这个问题,我不妨在这里写一个答案:)
我对两条流设置了如下键:
路由表流是使用网络路由的第一个字节设置密钥的
IP地址也由地址的第一个字节键入
这是在ip包通常在具有相同/8前缀的网络中路由的情况下工作的,这可以假定为大多数流量。
然后,通过有状态的 RichCoFlatMap 可以将路由表状态建立为键。接收新ip包时,请在路由表中进行查找。现在有两种可能的情况:
找不到匹配的路由。我们可以在这里保存这个包,但是丢弃它也可以。
如果找到路由,则输出[ipaddress,routingtableentry]的元组。
这样,我们有两个流,其中一个流对另一个流的控制数据进行了更改。

相关问题