两种类型的窗口,时间窗口和“计数”窗口

w8rqjzmb  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(390)

我正在尝试将文件作为带有窗口的流来处理。
这是密码

object Prog {

  def main(args: Array[String]) : Unit = {
    org.apache.log4j.BasicConfigurator.configure()

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val csvTableSource = CsvTableSource
      .builder
      .path("src/main/resources/data.stream")
      .field("numPers", Types.INT)
      .field("TIMESTAMP", Types.STRING)
      .fieldDelimiter(",")
      .ignoreFirstLine
      .ignoreParseErrors
      .commentPrefix("%")
      .build()

    tableEnv.registerTableSource("Data", csvTableSource)

    val table = tableEnv.scan("Data")
      .filter("numPers > 10")
      .select("*")

    val ds = tableEnv.toAppendStream(table, classOf[Row])

    ds.print()
    env.execute()
  }
}

问题是如何在这里实现window,例如,只显示不超过一小时的值。或者第二个窗口类型,当我读到最后50个条目时。

4sup72z8

4sup72z81#

在流处理中,窗口是计算聚合的组。
您的用例似乎有所不同。如果你想保留最后一个 x 分钟或 y 最后的记录,而不是这需要用不同的sql表示。
保留最后一个 5 分钟大概是

SELECT * FROM Data d WHERE d.tstamp >  (now() - INTERVAL '5' MINUTE)

所以,这将是对某种时间戳属性的过滤器。
保留最后10行将是

SELECT * FROM Data d ORDER BY d.tstamp DESC LIMIT 10

但是,flink(1.5版)sql或表api还不支持这些操作。

相关问题