错误:方法apply不适用于参数(windowfunction)

hjzp0vay  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(484)

我刚开始使用apache flink。我从apachekafka源代码读取数据,需要转换 DataStream .
在最后一步中,我尝试应用 WindowFunction :

DataStream<Tuple8<Double, Double, String, Double, Double, Double, Double, Double>> dataStream = 
                     env
                    .addSource(new FlinkKafkaConsumer08<>(
                                    parameterTool.getRequired("topic"),
                                    new SimpleStringSchema(), 
                                    parameterTool.getProperties()))
                    .flatMap(new SplitIntoRecordsString())
                    .flatMap(new SplitIntoTuples())
                    .keyBy(1)
                    .countWindow(5)
                    .apply(new windowApplyFunction());

    public class windowApplyFunction implements WindowFunction<
                                                            Tuple8<Double, Double, String, Double, Double, Double, Double, Double>, 
                                                            String,  
                                                            Double, 
                                                            Window>{

    public void apply(Double key, Window window,
            Iterable<Tuple8<Double, Double, String, Double, Double, Double, Double, Double>> values,
            Collector<String> out)
            throws Exception {      
        out.collect("MyResult");
    }
}

不幸的是,我遇到了以下错误,不知道如何修复它:

The method apply(WindowFunction<Tuple8<Double,Double,String,Double,Double,Double,Double,Double>,R,Tuple,GlobalWindow>) in the type WindowedStream<Tuple8<Double,Double,String,Double,Double,Double,Double,Double>,Tuple,GlobalWindow> is not applicable for the arguments (FlinkManager.windowApplyFunction)

如果我换新的,一切正常 apply(new windowApplyFunction()) 具有预定义的功能,例如。 sum(1) .

iezvtpos

iezvtpos1#

你的 WindowFunction 应该是类型

WindowFunction<
    Tuple8<Double, Double, String, Double, Double, Double, Double, Double>,
    String,
    Double,
    GlobalWindow>
``` `countWindow()` 退货 `GlobalWindow` 类型。
试试看。
lhcgjxsq

lhcgjxsq2#

谢谢你的提示!在纠正了这个错误后,我又换了一个小东西,现在可以用了!正确的代码:

public static class windowApplyFunction implements WindowFunction<
                                                            Tuple8<Double, Double, String, Double, Double, Double, Double, Double>, 
                                                            Tuple8<Double, Double, String, Double, Double, Double, Double, Double>, 
                                                            Tuple, 
                                                            GlobalWindow>{

    public void apply(Tuple key, GlobalWindow window,
            Iterable<Tuple8<Double, Double, String, Double, Double, Double, Double, Double>> values,
            Collector<Tuple8<Double, Double, String, Double, Double, Double, Double, Double>> out)
            throws Exception {      
        out.collect(new Tuple8<Double, Double, String, Double, Double, Double, Double, Double>());
    }
}

相关问题