如果我们在KeyedStream#进程的KeyedProcessFunction中填充状态对象,例如
new KeyedProcessFunction<String, Rule, Rule>() {
private MapState<String, ArrayList<Rule>> rulesState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
rulesState = getRuntimeContext().getMapState(Descriptors.rulesPerCustomerDescriptor);
}
@Override
public void processElement(Rule value, KeyedProcessFunction<String, Rule, Rule>.Context ctx, Collector<Rule> out) throws Exception {
out.collect(value);
rulesState.put();
}
});
public class MyRichFilterFunction extends RichFilterFunction<Transaction> {
MapState<String, ArrayList<Rule>> rulesState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// use rules state variable to get the rules
rulesState = getRuntimeContext().getMapState(RulesEvaluator.Descriptors.rulesPerCustomerDescriptor);
}
@Override
public boolean filter(Transaction value) throws Exception {
for (Map.Entry<String, ArrayList<Rule>> entry : rulesState.entries()) {
for (Rule rule : entry.getValue()) {
.........
}
}
return true;
}
}
// usage
// fill the state
rulesUpdateStream.process(new ProcessFunction<Rule, Rule>()... // given above
// use the filled state
DataStream<Transaction> alerts = transactions.filter(new MyRichFilterFunction());
如果另一个KeyedStream#process方法使用相同的key进行分区,我们会从它们中获得相同的state对象吗?
1条答案
按热度按时间zbq4xfa01#
否。状态是建立状态的特定运营商的本地状态。它不能从任何其他地方访问。