我正在尝试将文件作为带有窗口的流来处理。
这是密码
object Prog {
def main(args: Array[String]) : Unit = {
org.apache.log4j.BasicConfigurator.configure()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val csvTableSource = CsvTableSource
.builder
.path("src/main/resources/data.stream")
.field("numPers", Types.INT)
.field("TIMESTAMP", Types.STRING)
.fieldDelimiter(",")
.ignoreFirstLine
.ignoreParseErrors
.commentPrefix("%")
.build()
tableEnv.registerTableSource("Data", csvTableSource)
val table = tableEnv.scan("Data")
.filter("numPers > 10")
.select("*")
val ds = tableEnv.toAppendStream(table, classOf[Row])
ds.print()
env.execute()
}
}
问题是如何在这里实现window,例如,只显示不超过一小时的值。或者第二个窗口类型,当我读到最后50个条目时。
1条答案
按热度按时间4sup72z81#
在流处理中,窗口是计算聚合的组。
您的用例似乎有所不同。如果你想保留最后一个
x
分钟或y
最后的记录,而不是这需要用不同的sql表示。保留最后一个
5
分钟大概是所以,这将是对某种时间戳属性的过滤器。
保留最后10行将是
但是,flink(1.5版)sql或表api还不支持这些操作。