apache flink广播状态被刷新

drnojrws  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(591)

我正在使用广播模式连接两个流并从一个流到另一个流读取数据。代码如下所示

case class Broadcast extends BroadCastProcessFunction[MyObject,(String,Double), MyObject]{
  override def processBroadcastElement(in2: (String, Double), 
                                       context: BroadcastProcessFunction[MyObject, (String, Double), MyObject]#Context,
                                       collector:Collector[MyObject]):Unit={
    context.getBroadcastState(broadcastStateDescriptor).put(in2._1,in2._2)
  }

  override def processElement(obj: MyObject,
                            readOnlyContext:BroadCastProcessFunction[MyObject, (String,Double), 
                            MyObject]#ReadOnlyContext, collector: Collector[MyObject]):Unit={
    val theValue = readOnlyContext.getBroadccastState(broadcastStateDesriptor).get(obj.prop)
    //If I print the context of the state here sometimes it is empty.
    out.collect(MyObject(new, properties, go, here))
  }
}

状态描述符:

val broadcastStateDescriptor: MapStateDescriptor[String, Double) = new MapStateDescriptor[String, Double]("name_for_this", classOf[String], classOf[Double])

我的执行代码是这样的。

val streamA :DataStream[MyObject] = ... 
val streamB :DataStream[(String,Double)] = ... 
val broadcastedStream = streamB.broadcast(broadcastStateDescriptor)

streamA.connect(streamB).process(new Broadcast)

问题出在 processElement 函数状态有时是空的,有时不是。状态应该始终包含数据,因为我不断地从一个文件流,我知道它有数据。我不明白为什么它正在刷新状态,我无法获取数据。
我试着在纸上加些印刷品 processBroadcastElement 在将数据放入状态之前和之后,结果如下

0 - 1
1 - 2 
2 - 3 
.. all the way to 48 where it resets back to 0

更新:我注意到,当我减少流执行上下文的超时值时,结果会更好一些。当我增加它时,Map总是空的。

env.setBufferTimeout(1) //better results 
env.setBufferTimeout(200) //worse result (default is 100)
rslzwgfq

rslzwgfq1#

当两个流在flink中连接时,您无法控制flink将事件从这两个流传递到用户函数的时间。因此,例如,如果streama中有一个事件可供处理,streamb中有一个事件可供处理,那么接下来可能会处理其中的任何一个。您不能期望broadcastedstream以某种方式优先于另一个流。
根据您的需求,您可以采用各种策略来处理这两个流之间的竞争。例如,可以使用keyedbroadcastprocessfunction并使用其applytokeyedstate方法在新广播事件到达时迭代所有现有的keyed状态。

nwlqm0z1

nwlqm0z12#

正如大卫提到的,工作可能会重新开始。我禁用了检查点,这样就可以看到抛出的任何可能的异常,而不是让flink默默地失败并重新启动作业。
结果发现,试图解析文件时出错。因此,作业不断重新启动,因此状态为空,而flink一次又一次地消耗流。

相关问题