我想申请一个时间窗口为 60 minutes
但它只收集它出现的时间内的值,对于没有值的窗口不显示任何内容。
我希望它在某种程度上,对于没有任何价值的Windows 0
以使数据更加连续。
例如:
df = sc.parallelize(
[Row(datetime='2015/01/01 03:00:36', value = 2.0),
Row(datetime='2015/01/01 03:40:12', value = 3.0),
Row(datetime='2015/01/01 05:25:30', value = 1.0)]).toDF()
df1 = df.select(sf.unix_timestamp(sf.column("datetime"), 'yyyy/MM/dd HH:mm:ss').cast(TimestampType()).alias("timestamp"), sf.column("value"))
df1.groupBy(sf.window(sf.col("timestamp"), "60 minutes")).agg(sf.sum("value")).show(truncate = False)
我得到的结果是:
+------------------------------------------+----------+
|window |sum(value)|
+------------------------------------------+----------+
|[2015-01-01 03:00:00, 2015-01-01 04:00:00]|5.0 |
|[2015-01-01 05:00:00, 2015-01-01 06:00:00]|1.0 |
+------------------------------------------+----------+
然而,我更希望输出是:
+------------------------------------------+----------+
|window |sum(value)|
+------------------------------------------+----------+
|[2015-01-01 03:00:00, 2015-01-01 04:00:00]|5.0 |
|[2015-01-01 04:00:00, 2015-01-01 05:00:00]|0.0 |
|[2015-01-01 05:00:00, 2015-01-01 06:00:00]|1.0 |
+------------------------------------------+----------+
编辑:
然后,如何将其扩展到两倍的groupby和每个“name”的相等窗口数:
df = sc.parallelize(
[Row(name = 'ABC', datetime = '2015/01/01 03:00:36', value = 2.0),
Row(name = 'ABC', datetime = '2015/01/01 03:40:12', value = 3.0),
Row(name = 'ABC', datetime = '2015/01/01 05:25:30', value = 1.0),
Row(name = 'XYZ', datetime = '2015/01/01 05:15:30', value = 2.0)]).toDF()
df1 = df.select('name', sf.unix_timestamp(sf.column("datetime"), 'yyyy/MM/dd HH:mm:ss').cast(TimestampType()).alias("timestamp"), sf.column("value"))
df1.show(truncate = False)
>>>+----+-------------------+-----+
|name|timestamp |value|
+----+-------------------+-----+
|ABC |2015-01-01 03:00:36|2.0 |
|ABC |2015-01-01 03:40:12|3.0 |
|ABC |2015-01-01 05:25:30|1.0 |
|XYZ |2015-01-01 05:15:30|2.0 |
+----+-------------------+-----+
我希望结果是:
+----+------------------------------------------+----------+
|name|window |sum(value)|
+----+------------------------------------------+----------+
|ABC |[2015-01-01 03:00:00, 2015-01-01 04:00:00]|5.0 |
|ABC |[2015-01-01 04:00:00, 2015-01-01 05:00:00]|0.0 |
|ABC |[2015-01-01 05:00:00, 2015-01-01 06:00:00]|1.0 |
|XYZ |[2015-01-01 03:00:00, 2015-01-01 04:00:00]|0.0 |
|XYZ |[2015-01-01 04:00:00, 2015-01-01 05:00:00]|0.0 |
|XYZ |[2015-01-01 05:00:00, 2015-01-01 06:00:00]|2.0 |
+----+------------------------------------------+----------+
1条答案
按热度按时间fdx2calv1#
这实际上是分组的行为
window
因为在第4和第5小时之间没有对应的行。但是,可以通过在单独的Dataframe中使用
sequence
功能与出发点min(timestamp)
至max(timestamp)
缩短为小时。然后,使用transfrom
函数在生成的序列上创建strat结构和每个bucket的结束时间:现在,使用原始的dataframe和groupby
window
列的总和value
: