在Spark SQL中使用两个预定义边界指定窗口间隔的正确方法是什么?
我正试图在“3小时前到2小时前”的窗口上对表中的值进行求和。
运行此查询时:
select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 2 hours preceding and current row
) as sum_value
from my_temp_table;
这很有效。我得到了我期望的结果,iidoEe.落入2小时滚动窗口的值的总和。
现在,我需要的是使滚动窗口不绑定到当前行,而是考虑3小时前到2小时前之间的行。
select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and 2 hours preceding
) as sum_value
from my_temp_table;
但我得到extraneous input 'hours' expecting {'PRECEDING', 'FOLLOWING'}
错误。
我还尝试了:
select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and interval 2 hours preceding
) as sum_value
from my_temp_table;
但我得到了不同的错误scala.MatchError: CalendarIntervalType (of class org.apache.spark.sql.types.CalendarIntervalType$)
我尝试的第三个选项是:
select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and 2 preceding
) as sum_value
from my_temp_table;
它并不像我们预期的那样工作cannot resolve 'RANGE BETWEEN interval 3 hours PRECEDING AND 2 PRECEDING' due to data type mismatch
我很难找到区间类型的文档,因为this link没有说明足够的信息,其他信息也是半生不熟的。至少我找到的是这样。
4条答案
按热度按时间iibxawm41#
由于距离区间不起作用,我不得不转向另一种方法,大致如下:
在我的例子中,我必须为一天中的每个小时运行计算,并将那些“每小时”的结果,即24个 Dataframe 的列表,合并成一个“每日”的 Dataframe 。
从非常高的层次来看,代码如下所示:
qq24tv8q2#
获得相同结果的解决方法是计算最近3小时内的值的总和,然后减去最近2小时内的值的总和:
62lalag43#
遇到了同样的问题,找到了一个简单的解决方案。就这样:
你也可以使用时间戳来提高可读性。(如果需要的话):
fcipmucu4#
我知道这是一个老问题,但我想我会抛出,我认为原来的问题是语法。
您已经:
但这是一个区间和一个整数。应该可以这样做: