Apache FlinkMap状态与值[Map[String,String]]用法

rbpvctlc  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(142)

我按键划分流,并为每个键管理一个Map状态;

stream
  .keyBy(_.userId)
  .process(new MyStateFunc)

每一次,我都必须读取一个键下的所有值,计算一些东西,并且只更新其中的几个。

class MyStateFunc() .. {
  

    val state = ValueState[Map[String, String]]

    def process(event: MyModel...): {
       val stateAsMap = state.value()
       val updatedStateValues = updateAFewColumnsOfStateValByUsingIncomingEvent(event, stateAsMap)
       doCalculationByUsingSomeValuesOfState(updatedStateValues)
       state.update(updatedStateValues)
    }
    def updateAFewColumnsOfStateValByUsingIncomingEvent(event, state): Map[String, String] = {
      val updateState = Map.empty
      event.foreach {case (status, newValue) => 
        updateState.put(status, newValue)
      }
      state ++ updatedState
    }
    def doCalculationByUsingSomeValuesOfState(stateValues): Map[String, String] = {
      // do some staff by using some key and values
    }
}

我不确定这是不是最有效的方法。是的,我必须读取所有的值(至少是其中的一些)来进行计算,但是我也需要更新其中的一些值,而不是每个键中存储的所有Map。我只是想知道哪一种更有效; Value[Map[String, String]]MapState[String, String]
如果我使用MapState[String, String],我必须做类似下面的事情来更新相关的键;

val state = MapState[String, String]
    def process(event: MyModel...): {
       val stateAsMap = state.entries().asScala
       event.foreach { case (status, newValue)
         state.put(status, newValue)
       }
    }

我不确定尝试更新每个事件类型的状态是否有效。

mapState.putAll(changeEvents)

这是否仅覆盖相关项而不是所有项?
还是可以另辟蹊径克服?

w6lpcovy

w6lpcovy1#

如果你的状态只有几个条目,那么这可能没什么关系。如果你的Map可以有大量的条目,那么使用MapState(带RocksDB状态后端)应该会大大减少序列化的成本,因为你只更新了几个条目,而不是整个状态。
请注意,为了提高效率,您应该对MapState迭代一次,进行计算并(偶尔)更新条目(假设这是可能的)。

相关问题