代码如下:
env
.addSource(...)
.map(r => (0, r))
.keyBy(0)
.timeWindow(Time.seconds(30), Time.seconds(1))
.fold(mutable.HashSet[String](),(a:(Int,String),b:mutable.HashSet[String])=>a)
编译时出错,错误消息为:
错误:类windowedstream中缺少方法fold的参数;如果要将其视为部分应用的函数timewindow(time.seconds(30),time.seconds(1)).fold(mutable.hashsetstring,
但windowedstream类中定义的函数是:
公共折叠(r initialvalue,foldfunction)
1条答案
按热度按时间2izufjch1#
问题有两个方面:首先是
fold
函数需要FoldFunction
如果使用scala,则在第二个参数列表中传递。第二,第一个参数FoldFunction
应为聚合类型。因此,在您的情况下,它应该是mutable.HashSet[String]
. 下面的代码片段应该可以做到这一点:注意Flink的
fold
api调用已弃用。现在建议使用aggregate
api调用。