如何在Apache Flink中使用初始值初始化键控状态?

gblwokeq  于 2023-02-10  发布在  Apache
关注(0)|答案(1)|浏览(239)

我想在Apache Flink的KeyedStream中初始化MapState,初始值如本文末尾的代码片段所示,不幸的是,Flink不允许在open方法中这样做,如下所述:由于我引用的帖子是2020年的,我希望从那以后可能发生了一些变化。
我希望放入MapState的初始值对于所有键都是相同的。
"我所尝试的"

  • 重写org.apache.flink.streaming.api.checkpoint.CheckpointedFunction中的initializeState函数并在其中添加myState.put(...)内容,但这会导致与在open方法中执行此操作相同的异常
  • 这篇文章提到使用OperatorState,但我不认为这适用于我的用例:Flink keyed stream key is null

我意识到可以在processElement中执行类似if (myState.isEmpty()) { addInitialStateToMyState() }的操作,但希望避免这种情况

new KeyedProcessFunction<String, Row, String>()  {
            MapState<String,String> myState;

            @Override
            public void processElement(final Row event, final KeyedProcessFunction<String, Row, String>.Context context, final Collector<String> collector) throws Exception {
               ...
               myState.put("this", "works")
               ...
            }

            @Override
            public void open(Configuration configuration) throws Exception {
                MapStateDescriptor<> myStateDescriptor = new MapStateDescriptor<>(
                        "my-state",
                        String.class,
                        String.class
                );
                myState = getRuntimeContext().getMapState(myStateDescriptor);

                // can't initialize here; can only do this inside `processElement`
                myState.put("this", "fails");
            }

感谢您的任何帮助/见解!

f2uvfpb9

f2uvfpb91#

遗憾的是,我不认为有一种方法可以更好地做到这一点。所有对键控状态的访问都必须以键感知的方式完成,这意味着它需要在processElement中完成。运算符状态通常更复杂,所以如果选择它的唯一原因是代码清洁,我会避免它。

相关问题