我按键划分流,并为每个键管理一个Map状态;
stream
.keyBy(_.userId)
.process(new MyStateFunc)
每一次,我都必须读取一个键下的所有值,计算一些东西,并且只更新其中的几个。
class MyStateFunc() .. {
val state = ValueState[Map[String, String]]
def process(event: MyModel...): {
val stateAsMap = state.value()
val updatedStateValues = updateAFewColumnsOfStateValByUsingIncomingEvent(event, stateAsMap)
doCalculationByUsingSomeValuesOfState(updatedStateValues)
state.update(updatedStateValues)
}
def updateAFewColumnsOfStateValByUsingIncomingEvent(event, state): Map[String, String] = {
val updateState = Map.empty
event.foreach {case (status, newValue) =>
updateState.put(status, newValue)
}
state ++ updatedState
}
def doCalculationByUsingSomeValuesOfState(stateValues): Map[String, String] = {
// do some staff by using some key and values
}
}
我不确定这是不是最有效的方法。是的,我必须读取所有的值(至少是其中的一些)来进行计算,但是我也需要更新其中的一些值,而不是每个键中存储的所有Map。我只是想知道哪一种更有效; Value[Map[String, String]]
与MapState[String, String]
?
如果我使用MapState[String, String]
,我必须做类似下面的事情来更新相关的键;
val state = MapState[String, String]
def process(event: MyModel...): {
val stateAsMap = state.entries().asScala
event.foreach { case (status, newValue)
state.put(status, newValue)
}
}
我不确定尝试更新每个事件类型的状态是否有效。
mapState.putAll(changeEvents)
这是否仅覆盖相关项而不是所有项?
还是可以另辟蹊径克服?
1条答案
按热度按时间w6lpcovy1#
如果你的状态只有几个条目,那么这可能没什么关系。如果你的Map可以有大量的条目,那么使用
MapState
(带RocksDB状态后端)应该会大大减少序列化的成本,因为你只更新了几个条目,而不是整个状态。请注意,为了提高效率,您应该对
MapState
迭代一次,进行计算并(偶尔)更新条目(假设这是可能的)。