Flink抱怨当按proctime分组时,不支持对非时间属性字段进行排序

eit6fx6z  于 2023-03-06  发布在  Apache
关注(0)|答案(1)|浏览(781)

我用的是Flink 1.12
我写了一个简单的flink代码如下,它从StockSource读取股票记录,每个股票由三个字段组成:id,trade_date,price。pt是过程时间。
当我运行下面的sql时,flink抱怨不支持对非时间属性字段进行排序,我想知道问题出在哪里。
sql为:

val sql =
    """
      select  pt, sum(price) from sourceTable
      group by  pt
      order by pt

      """.stripMargin(' ')

    tenv.sqlQuery(sql).toRetractStream[Row].print()

Flink 代码为:

object OrderByProctime {
  
   def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    val ds: DataStream[Stock] = env.addSource(new StockSource())
    val tenv = StreamTableEnvironment.create(env)
    tenv.createTemporaryView("sourceTable", ds, $"id", $"price", $"pt".proctime())

    //ERROR:
    //Sort on a non-time-attribute field is not supported. org.apache.flink.table.api.TableException: Sort on a non-time-attribute field is not supported.
    val sql =
    """
      select  pt, sum(price) from sourceTable
      group by  pt
      order by pt

      """.stripMargin(' ')

    tenv.sqlQuery(sql).toRetractStream[Row].print()

    env.execute()
  }
}
jgwigjjp

jgwigjjp1#

Flink SQL将可以对物化视图状态进行垃圾收集的时态操作限制在具有所谓的 time attribute 的表中。这是一个时间戳字段,可以保证(至少大致上)是有序的--因此它必须是一个定义了水印(指定其无序程度)的事件时间列,或者它必须是一个处理时间列。
您使用的是较旧版本的Flink,我对它不太熟悉,但我认为您应该修改Stock类型,使其不包含pt字段,然后在转换为Table时引入pt字段作为附加字段https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion-1。(或者,您可以保持pt字段不变,并添加另一列以在转换过程中用作时间属性。)
注意,在指定TimeCharacteristic.EventTime的同时尝试使用处理时间,这有点不一致--但我不认为这实际上是一个问题。整个时间特征机制在Flink 1.12中已被弃用,不再起任何作用。

相关问题