以连续的方式在窗口上应用groupby

ufj5ltwl  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(279)

我想申请一个时间窗口为 60 minutes 但它只收集它出现的时间内的值,对于没有值的窗口不显示任何内容。
我希望它在某种程度上,对于没有任何价值的Windows 0 以使数据更加连续。
例如:

df = sc.parallelize(
  [Row(datetime='2015/01/01 03:00:36', value = 2.0),
   Row(datetime='2015/01/01 03:40:12', value = 3.0),
   Row(datetime='2015/01/01 05:25:30', value = 1.0)]).toDF()

df1 = df.select(sf.unix_timestamp(sf.column("datetime"), 'yyyy/MM/dd HH:mm:ss').cast(TimestampType()).alias("timestamp"), sf.column("value"))

df1.groupBy(sf.window(sf.col("timestamp"), "60 minutes")).agg(sf.sum("value")).show(truncate = False)

我得到的结果是:

+------------------------------------------+----------+
|window                                    |sum(value)|
+------------------------------------------+----------+
|[2015-01-01 03:00:00, 2015-01-01 04:00:00]|5.0       |
|[2015-01-01 05:00:00, 2015-01-01 06:00:00]|1.0       |
+------------------------------------------+----------+

然而,我更希望输出是:

+------------------------------------------+----------+
|window                                    |sum(value)|
+------------------------------------------+----------+
|[2015-01-01 03:00:00, 2015-01-01 04:00:00]|5.0       |
|[2015-01-01 04:00:00, 2015-01-01 05:00:00]|0.0       |
|[2015-01-01 05:00:00, 2015-01-01 06:00:00]|1.0       |
+------------------------------------------+----------+

编辑:
然后,如何将其扩展到两倍的groupby和每个“name”的相等窗口数:

df = sc.parallelize(
  [Row(name = 'ABC', datetime = '2015/01/01 03:00:36', value = 2.0),
   Row(name = 'ABC', datetime = '2015/01/01 03:40:12', value = 3.0),
   Row(name = 'ABC', datetime = '2015/01/01 05:25:30', value = 1.0),
   Row(name = 'XYZ', datetime = '2015/01/01 05:15:30', value = 2.0)]).toDF()

df1 = df.select('name', sf.unix_timestamp(sf.column("datetime"), 'yyyy/MM/dd HH:mm:ss').cast(TimestampType()).alias("timestamp"), sf.column("value"))

df1.show(truncate = False)

>>>+----+-------------------+-----+
|name|timestamp          |value|
+----+-------------------+-----+
|ABC |2015-01-01 03:00:36|2.0  |
|ABC |2015-01-01 03:40:12|3.0  |
|ABC |2015-01-01 05:25:30|1.0  |
|XYZ |2015-01-01 05:15:30|2.0  |
+----+-------------------+-----+

我希望结果是:

+----+------------------------------------------+----------+
|name|window                                    |sum(value)|
+----+------------------------------------------+----------+
|ABC |[2015-01-01 03:00:00, 2015-01-01 04:00:00]|5.0       |
|ABC |[2015-01-01 04:00:00, 2015-01-01 05:00:00]|0.0       |
|ABC |[2015-01-01 05:00:00, 2015-01-01 06:00:00]|1.0       |
|XYZ |[2015-01-01 03:00:00, 2015-01-01 04:00:00]|0.0       |
|XYZ |[2015-01-01 04:00:00, 2015-01-01 05:00:00]|0.0       |
|XYZ |[2015-01-01 05:00:00, 2015-01-01 06:00:00]|2.0       |
+----+------------------------------------------+----------+
fdx2calv

fdx2calv1#

这实际上是分组的行为 window 因为在第4和第5小时之间没有对应的行。
但是,可以通过在单独的Dataframe中使用 sequence 功能与出发点 min(timestamp)max(timestamp) 缩短为小时。然后,使用 transfrom 函数在生成的序列上创建strat结构和每个bucket的结束时间:

from pyspark.sql import functions as sf

buckets = df1.agg(
    sf.expr("""transform(
                sequence(date_trunc('hour', min(timestamp)), 
                         date_trunc('hour', max(timestamp)), 
                         interval 1 hour
                ),
                x -> struct(x as start, x + interval 1 hour as end)
              )
    """).alias("buckets")
).select(sf.explode("buckets").alias("window"))

buckets.show(truncate=False)

# +------------------------------------------+

# |window                                    |

# +------------------------------------------+

# |[2015-01-01 03:00:00, 2015-01-01 04:00:00]|

# |[2015-01-01 04:00:00, 2015-01-01 05:00:00]|

# |[2015-01-01 05:00:00, 2015-01-01 06:00:00]|

# +------------------------------------------+

现在,使用原始的dataframe和groupby window 列的总和 value :

df2 = buckets.join(
    df1,
    (sf.col("timestamp") >= sf.col("window.start")) &
    (sf.col("timestamp") < sf.col("window.end")),
    "left"
).groupBy("window").agg(
    sf.sum(sf.coalesce(sf.col("value"), sf.lit(0))).alias("sum")
)

df2.show(truncate=False)

# +------------------------------------------+---+

# |window                                    |sum|

# +------------------------------------------+---+

# |[2015-01-01 04:00:00, 2015-01-01 05:00:00]|0.0|

# |[2015-01-01 03:00:00, 2015-01-01 04:00:00]|5.0|

# |[2015-01-01 05:00:00, 2015-01-01 06:00:00]|1.0|

# +------------------------------------------+---+

相关问题