现在,我们在flink中使用了具有奇特窗口的sql,我正在尝试使用“在未来flink发布的表api和sql中可能出现的情况”来引用衰减的移动平均值。来自他们的sql路线图/预览2017-03帖子:
table
.window(Slide over 1.hour every 1.second as 'w)
.groupBy('productId, 'w)
.select(
'w.end,
'productId,
('unitPrice * ('rowtime - 'w.start).exp() / 1.hour).sum / (('rowtime - 'w.start).exp() / 1.hour).sum)
以下是我的尝试(也受到方解石衰变例子的启发):
SELECT
lb_index one_key,
HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND) start_time,
SUM(Y *
EXP(
proctime -
HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
))
FROM write_position
GROUP BY lb_index, HOP(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
time是处理时间,我们从appendstream表中创建write\u位置得到proctime,如下所示:
tEnv.registerTable(
"write_position",
tEnv.fromDataStream(appendStream, "lb_index, Y, proctime.proctime"))
我得到这个错误:
Cannot apply '-' to arguments of type '<TIME ATTRIBUTE(PROCTIME)> - <TIME ATTRIBUTE(PROCTIME)>'.
Supported form(s): '<NUMERIC> - <NUMERIC>' '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' '<DATETIME> - <DATETIME_INTERVAL>'
我试过把时间投射到我所知道的每一种类型上(试图到达数字的应许之地),但我就是找不到如何让它发挥作用。
我错过什么了吗?proctime是一种非常特殊的“系统变更号”时间,你不能转换吗?如果是这样的话,还必须有某种方法将其与hop\u start(proctime,…)值进行比较。
1条答案
按热度按时间knpiaxh11#
您可以使用timestampdiff减去两个时间点(参见文档)。你是这样用的
其中时间点单位可以是秒、分钟、小时、天、月或年。
我还没有尝试过处理时间,但它确实适用于事件时间字段,所以希望它能。