pyspark聚合,同时查找组的第一个值

k4emjkb1  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(364)

假设我有5 tb的数据和下面的模式,我使用的是pyspark。

| id | date | Month | KPI_1 | ... | KPI_n

对于90%的KPI,我只需要知道sum/min/max值聚合到(id,month)级别。对于剩下的10%,我需要知道基于日期的第一个值。
我的一个选择是 window . 例如,我可以

from pyspark.sql import Window
import pyspark.sql.functions as F

w = Window.partitionBy("id", "Month").orderBy(F.desc("date"))

# for the 90% kpi

agg_df = df.withColumn("kpi_1", F.sum("kpi_1").over(w))
agg_df = agg_df.withColumn("kpi_2", F.max("kpi_2").over(w))
agg_df = agg_df.withColumn("kpi_3", F.min("kpi_3").over(w))
...

# Select last row for each window to get last accumulated sum for 90% kpis and last value for 10% kpi (which is equivalent to first value if ranked ascending).

# continue process agg_df with filters based on sum/max/min values of 90% KIPs.

但我不知道如何选择每个窗口的最后一行。有没有人有什么建议,或者有没有更好的方法来汇总?

up9lanfz

up9lanfz1#

假设我们有这些数据

+---+----------+-------+-----+-----+
| id|      date|  month|kpi_1|kpi_2|
+---+----------+-------+-----+-----+
|  1|2000-01-01|2000-01|    1|  100|
|  1|2000-01-02|2000-01|    2|  200|
|  1|2000-01-03|2000-01|    3|  300|
|  1|2000-01-04|2000-01|    4|  400|
|  1|2000-01-05|2000-01|    5|  500|
|  1|2000-02-01|2000-02|   10|   11|
|  1|2000-02-02|2000-02|   20|   21|
|  1|2000-02-03|2000-02|   30|   31|
|  1|2000-02-04|2000-02|   40|   41|
+---+----------+-------+-----+-----+

我们要计算最小值,最大值和 kpi_1 得到的最后一个值 kpi_2 每组。
通过对数据进行分组,可以得到最小值、最大值和和 id 以及 month :

df_avg = df \
    .groupBy("id","month") \
    .agg(F.sum("kpi_1"), F.min("kpi_1"), F.max("kpi_1"), F.first("kpi_2"))\
    .select("id", "month", "sum(kpi_1)", "min(kpi_1)", "max(kpi_1)")
df_avg.show()

印刷品

+---+-------+----------+----------+----------+
| id|  month|sum(kpi_1)|min(kpi_1)|max(kpi_1)|
+---+-------+----------+----------+----------+
|  1|2000-02|       100|        10|        40|
|  1|2000-01|        15|         1|         5|
+---+-------+----------+----------+----------+

获取的最后一个值 kpi_2 对每个小组来说都比较困难。第一个想法是在降序Dataframe上使用聚合函数first()。一个简单的测试给了我正确的结果,但不幸的是,文档中指出“函数是不确定的,因为它的结果取决于行的顺序,而行的顺序在洗牌之后可能是不确定的”。
一种更好的方法来获取 kpi_2 就是使用问题中显示的窗口。因为窗口函数row\u number()可以工作:

w = Window.partitionBy("id", "Month").orderBy(F.desc("date"))
df_first = df.withColumn("row_number", F.row_number().over(w)).where("row_number = 1")\
    .drop("row_number") \
    .select("id", "month", "KPI_2")
df_first.show()

印刷品

+---+-------+-----+
| id|  month|KPI_2|
+---+-------+-----+
|  1|2000-02|   41|
|  1|2000-01|  500|
+---+-------+-----+

将两个部分连接在一起可以得到所需的结果:

df_result = df_avg.join(df_first, ['id', 'month'])
df_result.show()

印刷品

+---+-------+----------+----------+----------+-----+
| id|  month|sum(kpi_1)|min(kpi_1)|max(kpi_1)|KPI_2|
+---+-------+----------+----------+----------+-----+
|  1|2000-02|       100|        10|        40|   41|
|  1|2000-01|        15|         1|         5|  500|
+---+-------+----------+----------+----------+-----+

相关问题