我想在Apache Flink的KeyedStream
中初始化MapState
,初始值如本文末尾的代码片段所示,不幸的是,Flink不允许在open
方法中这样做,如下所述:由于我引用的帖子是2020年的,我希望从那以后可能发生了一些变化。
我希望放入MapState的初始值对于所有键都是相同的。
"我所尝试的"
- 重写
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
中的initializeState
函数并在其中添加myState.put(...)
内容,但这会导致与在open
方法中执行此操作相同的异常 - 这篇文章提到使用
OperatorState
,但我不认为这适用于我的用例:Flink keyed stream key is null
我意识到可以在processElement
中执行类似if (myState.isEmpty()) { addInitialStateToMyState() }
的操作,但希望避免这种情况
new KeyedProcessFunction<String, Row, String>() {
MapState<String,String> myState;
@Override
public void processElement(final Row event, final KeyedProcessFunction<String, Row, String>.Context context, final Collector<String> collector) throws Exception {
...
myState.put("this", "works")
...
}
@Override
public void open(Configuration configuration) throws Exception {
MapStateDescriptor<> myStateDescriptor = new MapStateDescriptor<>(
"my-state",
String.class,
String.class
);
myState = getRuntimeContext().getMapState(myStateDescriptor);
// can't initialize here; can only do this inside `processElement`
myState.put("this", "fails");
}
感谢您的任何帮助/见解!
1条答案
按热度按时间f2uvfpb91#
遗憾的是,我不认为有一种方法可以更好地做到这一点。所有对键控状态的访问都必须以键感知的方式完成,这意味着它需要在
processElement
中完成。运算符状态通常更复杂,所以如果选择它的唯一原因是代码清洁,我会避免它。