spark窗口函数-rangebetween日期

9q78igpj  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(737)

我有Spark了 DataFrame 我要得到的是给定日期范围内当前行前面的所有行。举个例子,我想得到给定行前7天的所有行。我想我需要一个 Window Function 比如:

Window \
    .partitionBy('id') \
    .orderBy('start')

问题来了。我想喝一杯 rangeBetween 7天,但在spark的文件里我找不到这个。spark是否提供了这样的选择?现在,我只想得到前面所有的行:

.rowsBetween(-sys.maxsize, 0)

但我想实现以下目标:

.rangeBetween("7 days", 0)

如果有人能在这件事上帮助我,我将非常感激。提前谢谢!

x759pob2

x759pob21#

奇妙的解决方案@zero323,如果你想用几分钟而不是我必须的几天来操作,你不需要用id分区,所以你只需要修改代码的一部分,如我所示:

df.createOrReplaceTempView("df")
spark.sql(
    """SELECT *, sum(total) OVER (
        ORDER BY CAST(reading_date AS timestamp) 
        RANGE BETWEEN INTERVAL 45 minutes PRECEDING AND CURRENT ROW
     ) AS sum_total FROM df""").show()
bpsygsoo

bpsygsoo2#

Spark>=2.3
由于spark2.3可以使用sqlapi使用interval对象,但是 DataFrame api支持仍在进行中。

df.createOrReplaceTempView("df")

spark.sql(
    """SELECT *, mean(some_value) OVER (
        PARTITION BY id 
        ORDER BY CAST(start AS timestamp) 
        RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
     ) AS mean FROM df""").show()

## +---+----------+----------+------------------+

## | id|     start|some_value|              mean|

## +---+----------+----------+------------------+

## |  1|2015-01-01|      20.0|              20.0|

## |  1|2015-01-06|      10.0|              15.0|

## |  1|2015-01-07|      25.0|18.333333333333332|

## |  1|2015-01-12|      30.0|21.666666666666668|

## |  2|2015-01-01|       5.0|               5.0|

## |  2|2015-01-03|      30.0|              17.5|

## |  2|2015-02-01|      20.0|              20.0|

## +---+----------+----------+------------------+

Spark<2.3
据我所知,这是不可能直接在Spark或Hive。两者都需要 ORDER BY 从句与连用 RANGE 是数字。我找到的最接近的东西是转换为时间戳和秒操作。假设 start 列包含 date 类型:

from pyspark.sql import Row

row = Row("id", "start", "some_value")
df = sc.parallelize([
    row(1, "2015-01-01", 20.0),
    row(1, "2015-01-06", 10.0),
    row(1, "2015-01-07", 25.0),
    row(1, "2015-01-12", 30.0),
    row(2, "2015-01-01", 5.0),
    row(2, "2015-01-03", 30.0),
    row(2, "2015-02-01", 20.0)
]).toDF().withColumn("start", col("start").cast("date"))

小辅助对象和窗口定义:

from pyspark.sql.window import Window
from pyspark.sql.functions import mean, col

# Hive timestamp is interpreted as UNIX timestamp in seconds*

days = lambda i: i * 86400

最后查询:

w = (Window()
   .partitionBy(col("id"))
   .orderBy(col("start").cast("timestamp").cast("long"))
   .rangeBetween(-days(7), 0))

df.select(col("*"), mean("some_value").over(w).alias("mean")).show()

## +---+----------+----------+------------------+

## | id|     start|some_value|              mean|

## +---+----------+----------+------------------+

## |  1|2015-01-01|      20.0|              20.0|

## |  1|2015-01-06|      10.0|              15.0|

## |  1|2015-01-07|      25.0|18.333333333333332|

## |  1|2015-01-12|      30.0|21.666666666666668|

## |  2|2015-01-01|       5.0|               5.0|

## |  2|2015-01-03|      30.0|              17.5|

## |  2|2015-02-01|      20.0|              20.0|

## +---+----------+----------+------------------+

一点也不漂亮,但很管用。

  • 配置单元语言手册,类型

相关问题