我用的是老版本的flink。我升级到了1.2.0,并且我在过滤器方面遇到了一些问题。
我有一个日志数据流,它工作得很好:
val logs: DataStream[Log] = env.addSource(new LogSource(
data, delay, factor))
// DISPLAY TUPLE IN CONSOLE
logs.print()
// EXECUTE SCRIPT
env.execute("stream")
我当然已经阅读了文件,其中显示:
dataStream.filter { _ != 0 }
我试过很多类似的方法:
val cleanLogs = logs.filter { _.isComplete }
但我有以下错误:
类型不匹配,应为:filterfunction[log],actual:(any)=>an
所以我看不到文档和这个错误之间的联系。有什么帮助吗?举例说明?
谢谢
1条答案
按热度按时间eqqqjvef1#
问题首先是进口错误
StreamExecutionEnvironment
导致这个问题的基本函数是filter
.当我使用旧版本的flink时
LocalExecutionEnvironment
flink 1.x中不再提供的类。取而代之的是:
StreamExecutionEnvironment.createLocalEnvironment(1)