我用的是Flink 1.12
我写了一个简单的flink代码如下,它从StockSource读取股票记录,每个股票由三个字段组成:id,trade_date,price
。pt是过程时间。
当我运行下面的sql时,flink抱怨不支持对非时间属性字段进行排序,我想知道问题出在哪里。
sql为:
val sql =
"""
select pt, sum(price) from sourceTable
group by pt
order by pt
""".stripMargin(' ')
tenv.sqlQuery(sql).toRetractStream[Row].print()
Flink 代码为:
object OrderByProctime {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val ds: DataStream[Stock] = env.addSource(new StockSource())
val tenv = StreamTableEnvironment.create(env)
tenv.createTemporaryView("sourceTable", ds, $"id", $"price", $"pt".proctime())
//ERROR:
//Sort on a non-time-attribute field is not supported. org.apache.flink.table.api.TableException: Sort on a non-time-attribute field is not supported.
val sql =
"""
select pt, sum(price) from sourceTable
group by pt
order by pt
""".stripMargin(' ')
tenv.sqlQuery(sql).toRetractStream[Row].print()
env.execute()
}
}
1条答案
按热度按时间jgwigjjp1#
Flink SQL将可以对物化视图状态进行垃圾收集的时态操作限制在具有所谓的 time attribute 的表中。这是一个时间戳字段,可以保证(至少大致上)是有序的--因此它必须是一个定义了水印(指定其无序程度)的事件时间列,或者它必须是一个处理时间列。
您使用的是较旧版本的Flink,我对它不太熟悉,但我认为您应该修改
Stock
类型,使其不包含pt
字段,然后在转换为Table时引入pt
字段作为附加字段https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion-1。(或者,您可以保持pt
字段不变,并添加另一列以在转换过程中用作时间属性。)注意,在指定
TimeCharacteristic.EventTime
的同时尝试使用处理时间,这有点不一致--但我不认为这实际上是一个问题。整个时间特征机制在Flink 1.12中已被弃用,不再起任何作用。