flink:找不到适合进程的方法

trnvg8h3  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(378)

我试图在flink的数据流上应用每个窗口的功能。下面是我的代码

DataStream<Tuple2<String, String>> data = ...
DataStream<Tuple2<String, String>> freqCityChangeTransactions = data
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .process(new MyProcessWindowFunction());

下面是myprocesswindowfunction的实现

public static class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, String>, Tuple2<String, String>, String, TimeWindow> {

public void process(String key,
            Context context,
            Iterable<Tuple2<String, String>> input,
            Collector<Tuple2<String, String>> out) {
        // Do something ...
    }
}

然而,当我试图通过maven编译上述代码时,我得到以下错误

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.6.1:compile (default-compile) on project flink-examples: Compilation failure
[ERROR] /Users/furqan/Workspace/flink/src/main/java/com/baig/bank/Bank.java:[120,13] no suitable method found for process(com.baig.Bank.MyProcessWindowFunction)
[ERROR] method org.apache.flink.streaming.api.datastream.WindowedStream.<R>process(org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<org.apache.flink.api.java.tuple.Tuple2<java.lang.String,java.lang.String>,R,org.apache.flink.api.java.tuple.Tuple,org.apache.flink.streaming.api.windowing.windows.TimeWindow>) is not applicable
[ERROR] (cannot infer type-variable(s) R
[ERROR] (argument mismatch; com.baig.Bank.MyProcessWindowFunction cannot be converted to org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<org.apache.flink.api.java.tuple.Tuple2<java.lang.String,java.lang.String>,R,org.apache.flink.api.java.tuple.Tuple,org.apache.flink.streaming.api.windowing.windows.TimeWindow>))
[ERROR] method org.apache.flink.streaming.api.datastream.WindowedStream.<R>process(org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<org.apache.flink.api.java.tuple.Tuple2<java.lang.String,java.lang.String>,R,org.apache.flink.api.java.tuple.Tuple,org.apache.flink.streaming.api.windowing.windows.TimeWindow>,org.apache.flink.api.common.typeinfo.TypeInformation<R>) is not applicable
[ERROR] (cannot infer type-variable(s) R
[ERROR] (actual and formal argument lists differ in length))

知道我做错什么了吗?仅供参考,我正在使用ApacheFlinkVersion1.5.1,并在mac上使用maven3编译java代码。

00jrzges

00jrzges1#

问题是keyby中使用的keyselector与processwindowfunction中指定的键类型不匹配。您使用tuple2中的索引指定了键,因此编译器无法推断这些键将是字符串。在这种情况下,flink将键作为元组传递。
有几种方法可以解决这个问题。如果将keyby保持原样,则需要修改processwindowfunction以使用tuple作为键类型,如果要使用它,则必须将键强制转换为字符串。像这样的 ((Tuple1<String>)key).f0 . 更好的解决方案是使用更明确的键选择器,例如 keyBy(t -> t.f0) ,因此在编译时键是已知的字符串。

相关问题