Apache Spark 在两个指定时间界限之间的时间间隔(3小时到2小时之前)启动SQL窗口

xtupzzrd  于 2022-12-30  发布在  Apache
关注(0)|答案(4)|浏览(215)

在Spark SQL中使用两个预定义边界指定窗口间隔的正确方法是什么?
我正试图在“3小时前到2小时前”的窗口上对表中的值进行求和。
运行此查询时:

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 2 hours preceding and current row
) as sum_value
from my_temp_table;

这很有效。我得到了我期望的结果,iidoEe.落入2小时滚动窗口的值的总和。
现在,我需要的是使滚动窗口不绑定到当前行,而是考虑3小时前到2小时前之间的行。

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and 2 hours preceding
) as sum_value
from my_temp_table;

但我得到extraneous input 'hours' expecting {'PRECEDING', 'FOLLOWING'}错误。
我还尝试了:

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and interval 2 hours preceding
) as sum_value
from my_temp_table;

但我得到了不同的错误scala.MatchError: CalendarIntervalType (of class org.apache.spark.sql.types.CalendarIntervalType$)
我尝试的第三个选项是:

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and 2 preceding
) as sum_value
from my_temp_table;

它并不像我们预期的那样工作cannot resolve 'RANGE BETWEEN interval 3 hours PRECEDING AND 2 PRECEDING' due to data type mismatch
我很难找到区间类型的文档,因为this link没有说明足够的信息,其他信息也是半生不熟的。至少我找到的是这样。

iibxawm4

iibxawm41#

由于距离区间不起作用,我不得不转向另一种方法,大致如下:

  • 准备需要执行计算的间隔列表
  • 对于每个间隔,运行计算
  • 这些迭代中的每一次产生 Dataframe
  • 在迭代之后,我们有一个 Dataframe 列表
  • 将所述列表中的 Dataframe 合并为一个更大的 Dataframe
  • 写出结果

在我的例子中,我必须为一天中的每个小时运行计算,并将那些“每小时”的结果,即24个 Dataframe 的列表,合并成一个“每日”的 Dataframe 。
从非常高的层次来看,代码如下所示:

val hourlyDFs = for ((hourStart, hourEnd) <- (hoursToStart, hoursToEnd).zipped) yield {
    val data = data.where($"hour" <= lit(hourEnd) && $"hour" >= lit(hourStart))
    // do stuff
    // return a data frame
}
hourlyDFs.toSeq().reduce(_.union(_))
qq24tv8q

qq24tv8q2#

获得相同结果的解决方法是计算最近3小时内的值的总和,然后减去最近2小时内的值的总和:

select *, 
sum(value) over (
     partition by a, b
     order by cast(time_value as timestamp)
     range between interval 3 hours preceding and current row) 
- 
sum(value) over (
     partition by a, b
     order by cast(time_value as timestamp)
     range between interval 2 hours preceding and current row) 
as sum_value
from my_temp_table;
62lalag4

62lalag43#

遇到了同样的问题,找到了一个简单的解决方案。就这样:

unix_timestamp(datestamp) - unix_timestamp(datestamp) < 10800 --3 hours in seconds

你也可以使用时间戳来提高可读性。(如果需要的话):

select unix_timestamp(date_format(current_timestamp, 'HH:mm:ss'), 'HH:mm:ss') <
       unix_timestamp('03:00:00', 'HH:mm:ss') --Used timestamp for readibility
fcipmucu

fcipmucu4#

我知道这是一个老问题,但我想我会抛出,我认为原来的问题是语法。
您已经:

RANGE BETWEEN interval 3 hours PRECEDING AND 2 PRECEDING

但这是一个区间和一个整数。应该可以这样做:

RANGE BETWEEN interval 3 hours PRECEDING AND interval 2 hours PRECEDING

相关问题