apache flink广播状态被刷新

drnojrws  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(599)

我正在使用广播模式连接两个流并从一个流到另一个流读取数据。代码如下所示

  1. case class Broadcast extends BroadCastProcessFunction[MyObject,(String,Double), MyObject]{
  2. override def processBroadcastElement(in2: (String, Double),
  3. context: BroadcastProcessFunction[MyObject, (String, Double), MyObject]#Context,
  4. collector:Collector[MyObject]):Unit={
  5. context.getBroadcastState(broadcastStateDescriptor).put(in2._1,in2._2)
  6. }
  7. override def processElement(obj: MyObject,
  8. readOnlyContext:BroadCastProcessFunction[MyObject, (String,Double),
  9. MyObject]#ReadOnlyContext, collector: Collector[MyObject]):Unit={
  10. val theValue = readOnlyContext.getBroadccastState(broadcastStateDesriptor).get(obj.prop)
  11. //If I print the context of the state here sometimes it is empty.
  12. out.collect(MyObject(new, properties, go, here))
  13. }
  14. }

状态描述符:

  1. val broadcastStateDescriptor: MapStateDescriptor[String, Double) = new MapStateDescriptor[String, Double]("name_for_this", classOf[String], classOf[Double])

我的执行代码是这样的。

  1. val streamA :DataStream[MyObject] = ...
  2. val streamB :DataStream[(String,Double)] = ...
  3. val broadcastedStream = streamB.broadcast(broadcastStateDescriptor)
  4. streamA.connect(streamB).process(new Broadcast)

问题出在 processElement 函数状态有时是空的,有时不是。状态应该始终包含数据,因为我不断地从一个文件流,我知道它有数据。我不明白为什么它正在刷新状态,我无法获取数据。
我试着在纸上加些印刷品 processBroadcastElement 在将数据放入状态之前和之后,结果如下

  1. 0 - 1
  2. 1 - 2
  3. 2 - 3
  4. .. all the way to 48 where it resets back to 0

更新:我注意到,当我减少流执行上下文的超时值时,结果会更好一些。当我增加它时,Map总是空的。

  1. env.setBufferTimeout(1) //better results
  2. env.setBufferTimeout(200) //worse result (default is 100)
rslzwgfq

rslzwgfq1#

当两个流在flink中连接时,您无法控制flink将事件从这两个流传递到用户函数的时间。因此,例如,如果streama中有一个事件可供处理,streamb中有一个事件可供处理,那么接下来可能会处理其中的任何一个。您不能期望broadcastedstream以某种方式优先于另一个流。
根据您的需求,您可以采用各种策略来处理这两个流之间的竞争。例如,可以使用keyedbroadcastprocessfunction并使用其applytokeyedstate方法在新广播事件到达时迭代所有现有的keyed状态。

nwlqm0z1

nwlqm0z12#

正如大卫提到的,工作可能会重新开始。我禁用了检查点,这样就可以看到抛出的任何可能的异常,而不是让flink默默地失败并重新启动作业。
结果发现,试图解析文件时出错。因此,作业不断重新启动,因此状态为空,而flink一次又一次地消耗流。

相关问题