flink进程函数没有将数据返回到sideoutputstream

brc7rcf0  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(390)

我正在尝试用一组规则验证jsonobject如果json与一组规则匹配,它将返回匹配的规则,而jsonobject如果不匹配,它将返回一个jsonobject到sideoutput所有这些都是在processFunction中处理的,我得到的是主输出,但无法捕获side输出
sideoutput流定义如下

public final static OutputTag<org.json.JSONObject> unMatchedJSONSideOutput = new OutputTag<org.json.JSONObject>(
            "unmatched-side-output") {};

processfunction定义如下

public class RuleFilter extends ProcessFunction<Tuple2<String,org.json.JSONObject>,Tuple2<String,org.json.JSONObject>> {
@Override
    public void processElement(Tuple2<String, org.json.JSONObject> value,
            ProcessFunction<Tuple2<String, org.json.JSONObject>, Tuple2<String, org.json.JSONObject>>.Context ctx,
            Collector<Tuple2<String, org.json.JSONObject>> out) throws Exception {

        if(this.value.matches((value.f1))) {
        out.collect(new Tuple2<String, org.json.JSONObject>(value.f0,value.f1));
        }else {
            ctx.output(RuleMatching.unMatchedJSONSideOutput,value.f1);
        }
    }
}

我正在打印主数据流输出,如下所示

DataStream<Tuple2<String, org.json.JSONObject>> matchedJSON =
                            inputSignal.map(new MapFunction<org.json.JSONObject, Tuple2<String, org.json.JSONObject>>() {
                                @Override
                                public Tuple2<String, org.json.JSONObject> map(org.json.JSONObject input) throws Exception {
                                    return new Tuple2<>(value, input);
                                }
                            }).process(new RuleFilter()).print("MatchedJSON=>");

matchedJSON .print("matchedJSON=>");

我打印侧输出如下

DataStream<org.json.JSONObject> unmatchedJSON =
                        ((SingleOutputStreamOperator<org.json.JSONObject>) matchedJSON.map(new MapFunction<Tuple2<String, org.json.JSONObject>, org.json.JSONObject>() {
                            @Override
                            public org.json.JSONObject map(Tuple2<String, org.json.JSONObject> value) throws Exception {
                                return value.f1;
                            }
                        })).getSideOutput(unMatchedJSONSideOutput );

                unmatchedJSON.print("unmatchedJSON=>");

主流是打印输出,但是sideoutput没有打印无效的json请帮助解决这个问题

nc1teljy

nc1teljy1#

问题在于:

DataStream<org.json.JSONObject> unmatchedJSON =
    ((SingleOutputStreamOperator<org.json.JSONObject>) matchedJSON.map(...))
    .getSideOutput(unMatchedJSONSideOutput);

你应该打电话来 getSideOutput 直接打开 matchedJSON ,而不是应用 MapFunction 去吧。只有一个 ProcessFunction 可以有一个侧输出,它需要直接从 ProcessFunction . 通过从Map中强制转换输出流,您欺骗了编译器接受了这一点,但是运行时无法对此执行任何有意义的操作。

相关问题