pyspark-提取24小时窗口列的最大值,然后删除重复项

yduiuuwa  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(404)

我需要找到 max_value 为了 value 对于每个 id . 只需记录一次最大值,并在当天的第一次记录时记录 date ```
+---+-------------------+-----+----------+
| id| date|value| date_only|
+---+-------------------+-----+----------+
| J1|2016-10-01 00:00:00| Null|2016-10-01|
| J1|2016-10-01 01:00:00| 1|2016-10-01|
| J1|2016-10-01 12:30:30| 3|2016-10-01|
| J9|2016-10-06 00:00:00| 2|2016-10-06|
| J9|2016-10-06 09:20:00| 4|2016-10-06|
| J9|2016-10-06 09:20:00| Null|2016-10-06|
+---+-------------------+-----+----------+

所需Dataframe:

+---+-------------------+-----+----------+---------+
| id| date|value| date_only|max_value|
+---+-------------------+-----+----------+---------+
| J1|2016-10-01 00:00:00| Null|2016-10-01| 3|
| J1|2016-10-01 01:00:00| 1|2016-10-01| Null|
| J1|2016-10-01 12:30:30| 3|2016-10-01| Null|
| J9|2016-10-06 00:00:00| 2|2016-10-06| 4|
| J9|2016-10-06 09:20:00| 4|2016-10-06| Null|
| J9|2016-10-06 09:20:00| Null|2016-10-06| Null|
+---+-------------------+-----+----------+---------+

我尝试的是:
这只保留groupby变量,并从新Dataframe中排除所有其他变量(100)

df = df.groupBy("id", "date_only").agg(max("max_value").alias("max_value1")).sort('date_only')

下面是我尝试过的示例代码。每个日期有96行,但有时不匹配。是否有另一个类似pandates datetime的函数,或者是否需要删除arg之间的行?

w = Window.partitionBy("ID").orderBy(F.col("date_only").cast('long'))

main = main.withColumn('max_value', F.max("value").over(w))

我也尝试过,但是它没有改变列中的重复值:

df.groupBy("ID", "Date").agg(first("max_value").alias("max_value"), count("*").alias("cn"))
.withColumn("max_value", when(col("cn") > lit(1), lit(None)).otherwise(col("max_value")))

5vf7fwbs

5vf7fwbs1#

您可以将两个winspec与其默认帧一起使用(下面来自doc):
注意:未定义排序时,默认情况下使用无界窗口框架(rowframe、unboundpreceding、unboundfollowing)。定义排序时,默认情况下使用增长窗口帧(rangeframe、UnboundPreceding、currentrow)。

from pyspark.sql import Window, functions as F
w1 = Window.partitionBy('id') 
w2 = Window.partitionBy('id', F.col('date').astype('date')).orderBy('date')

# set up the first records on each day regardless if or not the date is ending with `00:00:00`

df.withColumn('max_value', F.when(F.row_number().over(w2)==1, F.max('value').over(w1))).show()
+---+-------------------+-----+----------+---------+                            
| id|               date|value| date_only|max_value|
+---+-------------------+-----+----------+---------+
| J1|2016-10-01 00:00:00| null|2016-10-01|        3|
| J1|2016-10-01 01:00:00|    1|2016-10-01|     null|
| J1|2016-10-01 12:30:30|    3|2016-10-01|     null|
| J9|2016-10-06 00:00:00|    2|2016-10-06|        4|
| J9|2016-10-06 09:20:00|    4|2016-10-06|     null|
| J9|2016-10-06 09:20:00| null|2016-10-06|     null|
+---+-------------------+-----+----------+---------+

编辑:根据评论,调整以结尾的所有日期 00:00:00 至最大值:

df.withColumn('max_value', F.when(F.col('date') == F.date_trunc('day', 'date'), F.max('value').over(w1))).show()

相关问题