在pyspark中使用自定义顺序选择最大/最大值

z9ju0rcb  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(254)

我有一些像这样的样本数据, df1 :

| id1 | id2  | yyyy_mm_dd |
|-----|------|------------|
| 1   | 3245 | 2021-01-01 |
| 1   | 4564 | 2021-01-01 |
| 1   | 3546 | 2021-01-01 |
| 1   | 632  | 2021-01-01 |
| 1   | 521  | 2021-01-01 |
| 2   | 7413 | 2021-01-01 |
| ... | ...  | ...        |

我还有时间 df 跟踪一个 status 每天 id2 ,命名为 df2 :

| yyyy_mm_dd | id2  | product | status |
|------------|------|---------|--------|
| 2021-01-01 | 3245 | p1      | i      |
| 2021-01-01 | 3245 | p2      | f_c    |
| 2021-01-01 | 3245 | p3      | n_c    |
| 2021-01-01 | 4564 | p1      | n_c    |
| 2021-01-01 | 4564 | p2      | n_c    |
| 2021-01-01 | 4564 | p3      | n_c    |
| 2021-01-01 | 3546 | p1      | f_c    |
| 2021-01-01 | 3546 | p2      | n_c    |
| 2021-01-01 | 3546 | p3      | n_c    |
| 2021-01-01 | 7413 | p1      | f_c    |
| ...        | ...  | ...     | ..     |

我想创建一个输出Dataframe id1 继承 statusid2 . 我面临的问题是,这两者之间存在一对多的关系 id1 以及 id2 所以很难继承身份。
考虑到这一点,我想 greatest / max 但这也很困难,因为它们是字符串。不过,有这样一种等级制度 i > f_c > n_c .
基于以上,我希望我的输出如下所示:

| yyyy_mm_dd | id1 | product | status |
|------------|-----|---------|--------|
| 2020-01-01 | 1   | p1      | i      |
| 2020-01-01 | 1   | p2      | f_c    |
| 2020-01-01 | 1   | p3      | n_c    |
| 2020-01-01 | 2   | p1      | f_c    |

有了输出, id1 =1继承
i status 为了 p1 因为 i 是世界上最伟大的地位 id2 = (3245, 4564, 3546) . 同样的情况也可以看到 id1 =1和 p2 ,作为 f_c 最终成为 status 因为它是世界上最大的 id2 = (3245, 4564, 3546) .
我知道我可以这样加入数据:

df3 = (
    df1
    .join(df2, on = ['yyyy_mm_dd', 'id2']
)

但我不知道该怎么做 statusid2 因为它不是数字。

nfzehxib

nfzehxib1#

你可以加入 dfdf2id2 以及 yyyy_mm_dd 列,然后计算行数和orderby when表达式,以便对 status ```
from pyspark.sql import functions as F, Window

result = df1.join(df2, ["yyyy_mm_dd", "id2"]).withColumn(
"rn",
F.row_number().over(
Window.partitionBy("yyyy_mm_dd", "id1", "product").orderBy(
F.when(F.col("status") == "i", 1).when(F.col("status") == "f_c", 2).when(F.col("status") == "n_c", 3)
)
)
).filter("rn = 1").drop("id2", "rn")

result.show()

+----------+---+-------+------+

|yyyy_mm_dd|id1|product|status|

+----------+---+-------+------+

|2021-01-01| 1| p2| f_c|

|2021-01-01| 2| p1| f_c|

|2021-01-01| 1| p1| i |

|2021-01-01| 1| p3| n_c|

+----------+---+-------+------+

或者如果你喜欢用 `groupBy` 与 `max` :

result = df1.join(df2, ["yyyy_mm_dd", "id2"]).groupBy("yyyy_mm_dd", "id1", "product").agg(
F.max(
F.when(F.col("status") == "i", 3).when(F.col("status") == "f_c", 2).when(F.col("status") == "n_c", 1)
).alias("max_status")
).select(
"yyyy_mm_dd", "id1", "product",
F.when(F.col("max_status") == 3, "i")
.when(F.col("max_status") == 2, "f_c")
.when(F.col("max_status") == 1, "n_c").alias("status")
)

相关问题