flink表/sqlapi:在会话窗口聚合之后修改rowtime属性

w6lpcovy  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(386)

我想用 Session 窗口聚合,然后运行 Tumble 在生成的结果之上的窗口聚合 Table API/Flink SQL .
可以修改吗 rowtime 第一个之后的属性 session 使之等于 .rowtime 最后一次观察到的事件?
我试着这样做:

table
  .window(Session withGap 2.minutes on 'rowtime as 'w)
  .groupBy('w, 'userId)
  .select(
    'userId,
    ('w.end.cast(Types.LONG) - 'w.start.cast(Types.LONG)) as 'sessionDuration,
    ('w.rowtime - 2.minutes) as 'rowtime
  )
  .window(Tumble over 5.minutes on 'rowtime as 'w)
  .groupBy('w)
  .select(
    'w.start,
    'w.end,
    'sessionDuration.avg as 'avgSession,
    'sessionDuration.count as 'numberOfSession
  )

关键是:

('w.rowtime - 2.minutes) as 'rowtime

所以我想重新分配一张唱片 .rowtime 最新的事件,没有会议间隙( 2.minutes 在本例中)。
这在batchtable中运行良好,但在streamtable中不起作用:

Exception in thread "main" org.apache.flink.table.api.ValidationException: TumblingGroupWindow('w, 'rowtime, 300000.millis) is invalid: Tumbling window expects a time attribute for grouping in a stream environment.

是的,我知道,感觉我不想发明时间机器,改变时间的顺序。但是,有没有可能以某种方式实现所描述的行为呢?

yqhsw0fo

yqhsw0fo1#

不,不幸的是,在当前版本(1.6.0)中,不能使用sql或表api来实现这一点。一旦您修改了时间属性(rowtime或proctime),它就变成了一个常规属性 TIMESTAMP 属性,失去了它特有的时间特征。
对于rowtime属性,原因是我们不能保证时间戳仍然与水印对齐。原则上,我们可以通过减去的时间间隔来延迟水印,但这还不受支持。

相关问题