如何在keyedbroadcastprocessfunction中控制flink将输出发送到sideoutput

emeijp43  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(310)

我试图根据一组规则来验证数据流,以检测flink中的模式,方法是使用一组规则来验证数据流,我使用for循环来收集map中的所有模式,并在processelement fn中对其进行迭代,以找到模式示例代码,如下所示
mapstate描述符和边输出流如下

public static final MapStateDescriptor<String, String> ruleSetDescriptor =
        new MapStateDescriptor<String, String>("RuleSet", BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO);

public final static OutputTag<Tuple2<String, String>> unMatchedSideOutput =
        new OutputTag<Tuple2<String, String>>(
                "unmatched-side-output") {
        };

处理功能和广播功能如下:

@Override
    public void processElement(Tuple2<String, String> inputValue, ReadOnlyContext ctx,
                               Collector<Tuple2<String,
                                       String>> out) throws Exception {

        for (Map.Entry<String, String> ruleSet:
                ctx.getBroadcastState(broadcast.patternRuleDescriptor).immutableEntries()) {

            String ruleName = ruleSet.getKey();

//If the rule in ruleset is matched then send output to main stream and break the program
            if (this.rule) {
                out.collect(new Tuple2<>(inputValue.f0, inputValue.f1));
                break;
            }
        }

        // Writing output to sideout if no rule is matched 
        ctx.output(Output.unMatchedSideOutput, new Tuple2<>("No Rule Detected", inputValue.f1));
    }

    @Override
    public void processBroadcastElement(Tuple2<String, String> ruleSetConditions, Context ctx, Collector<Tuple2<String,String>> out) throws Exception {

        ctx.getBroadcastState(broadcast.ruleSetDescriptor).put(ruleSetConditions.f0,
                ruleSetConditions.f1);

    }

我能够检测到模式,但我也得到sideoutput,因为我试图一个接一个地迭代规则如果我的匹配规则出现在last中,程序将输出发送到sideoutput,因为初始规则集不匹配。我想打印侧输出只有一次,如果没有一个规则是满意的,我是新来Flink请帮助我如何才能实现它。

nom7f22z

nom7f22z1#

在我看来,你想做些更像这样的事情:

@Override
public void processElement(Tuple2<String, String> inputValue, ReadOnlyContext ctx, Collector<Tuple2<String, String>> out) throws Exception {

    transient boolean matched = false;

    for (Map.Entry<String, String> ruleSet:
      ctx.getBroadcastState(broadcast.patternRuleDescriptor).immutableEntries()) {

        String ruleName = ruleSet.getKey();

        if (this.rule) {
            matched = true;
            out.collect(new Tuple2<>(inputValue.f0, inputValue.f1));
            break;
        }
    }

    // Writing output to sideout if no rule was matched
    if (!matched) {
        ctx.output(Output.unMatchedSideOutput, new Tuple2<>("No Rule Detected", inputValue.f1));
    }
}

相关问题