我正在尝试使用表api对批处理表执行窗口操作。
我的批处理表具有以下架构:
root
|-- startdate: TIMESTAMP(3)
|-- enddate: TIMESTAMP(3)
|-- quantity: LEGACY(BigDecimal)
|-- productid: LEGACY(BigDecimal)
基于此模式,我尝试执行以下操作:
val env = BatchTableEnvironment.create(fsEnv)
val ds = ...
val table = env.fromDataSet("id, startdate, enddate, quantity")
val result = table
.window(Over.partitionBy("productid").orderBy("startdate").as("w"))
.select("startdate, enddate, quantity.sum over w as sumquantity, productid")
在执行上述代码时,出现以下错误:
Ordering must be defined on a time attribute.
但当我查看flink源代码时,我发现了以下评论:
/**
* Specifies the time attribute on which rows are ordered.
*
* <p>For streaming tables, reference a rowtime or proctime time attribute here
* to specify the time mode.
*
* <p>For batch tables, refer to a timestamp or long attribute.
*
* @param orderBy field reference
* @return an over window with defined order
*/
public OverWindowPartitionedOrdered orderBy(String orderBy) {
return this.orderBy(ExpressionParser.parseExpression(orderBy));
}
这里它清楚地说:“对于批处理表,请参阅timestamp或long属性。”。如您所见,startdate属性的类型是timestamp。我用的是flink 1.9
这是虫子还是我遗漏了什么?
暂无答案!
目前还没有任何答案,快来回答吧!