我想用一个 ProcessWindowFunction
在我的apache-flink项目中。但我在使用process函数时遇到了一些错误,请参见下面的代码片段
错误是:
windowedstream,tuple,timewindow>类型中的方法进程(processwindowfunction,r,tuple,timewindow>)不适用于参数(jdbcexample.myprocesswindows)
我的程序:
DataStream<Tuple2<String, JSONObject>> inputStream;
inputStream = env.addSource(new JsonArraySource());
inputStream.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.minutes(10)))
.process(new MyProcessWindows());
我的 ProcessWindowFunction
:
private class MyProcessWindows
extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, String, Window>
{
public void process(
String key,
Context context,
Iterable<Tuple2<String, JSONObject>> input,
Collector<Tuple2<String, String>> out) throws Exception
{
...
}
}
2条答案
按热度按时间6kkfgxo01#
费边说:)使用
Tuple
应该管用,但确实涉及到一些丑陋的类型在您的模型ProcessWindowFunction
. 使用KeySelector
简单,代码更清晰。例如然后,通过上面的内容可以定义
ProcessWindowFunction
比如:0aydgbwb2#
问题可能是
ProcessWindowFunction
.您正在按位置引用密钥(
keyBy(0)
). 因此,编译器无法推断其类型(String
)你需要改变一下ProcessWindowFunction
收件人:通过替换
String
由Tuple
现在您有了一个可以强制转换到的键的通用占位符Tuple1<String>
当您需要访问processElement()
方法:如果定义了
KeySelector<IN, KEY>
函数提取键,因为返回类型KEY
的KeySelector
编译器已知。