flink是否总是为相同的键值返回相同的分区状态对象?

2ul0zpep  于 2023-09-28  发布在  Apache
关注(0)|答案(1)|浏览(143)

如果我们在KeyedStream#进程的KeyedProcessFunction中填充状态对象,例如

new KeyedProcessFunction<String, Rule, Rule>() {
                  private MapState<String, ArrayList<Rule>> rulesState;
                  @Override
                  public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    rulesState = getRuntimeContext().getMapState(Descriptors.rulesPerCustomerDescriptor);
                  }
                  @Override
                  public void processElement(Rule value, KeyedProcessFunction<String, Rule, Rule>.Context ctx, Collector<Rule> out) throws Exception {
                    out.collect(value);
                    rulesState.put();
                  }
                });
public class MyRichFilterFunction extends RichFilterFunction<Transaction> {
        MapState<String, ArrayList<Rule>> rulesState;

@Override
public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // use rules state variable to get the rules
        rulesState = getRuntimeContext().getMapState(RulesEvaluator.Descriptors.rulesPerCustomerDescriptor);
        }

        @Override
        public boolean filter(Transaction value) throws Exception {
            for (Map.Entry<String, ArrayList<Rule>> entry : rulesState.entries()) {
            for (Rule rule : entry.getValue()) {
                .........
              }
            }
            return true;
          }
        }

// usage
// fill the state
rulesUpdateStream.process(new ProcessFunction<Rule, Rule>()... // given above
// use the filled state
DataStream<Transaction> alerts = transactions.filter(new MyRichFilterFunction());

如果另一个KeyedStream#process方法使用相同的key进行分区,我们会从它们中获得相同的state对象吗?

zbq4xfa0

zbq4xfa01#

否。状态是建立状态的特定运营商的本地状态。它不能从任何其他地方访问。

相关问题