我有一些像这样的样本数据, df1
:
| id1 | id2 | yyyy_mm_dd |
|-----|------|------------|
| 1 | 3245 | 2021-01-01 |
| 1 | 4564 | 2021-01-01 |
| 1 | 3546 | 2021-01-01 |
| 1 | 632 | 2021-01-01 |
| 1 | 521 | 2021-01-01 |
| 2 | 7413 | 2021-01-01 |
| ... | ... | ... |
我还有时间 df
跟踪一个 status
每天 id2
,命名为 df2
:
| yyyy_mm_dd | id2 | product | status |
|------------|------|---------|--------|
| 2021-01-01 | 3245 | p1 | i |
| 2021-01-01 | 3245 | p2 | f_c |
| 2021-01-01 | 3245 | p3 | n_c |
| 2021-01-01 | 4564 | p1 | n_c |
| 2021-01-01 | 4564 | p2 | n_c |
| 2021-01-01 | 4564 | p3 | n_c |
| 2021-01-01 | 3546 | p1 | f_c |
| 2021-01-01 | 3546 | p2 | n_c |
| 2021-01-01 | 3546 | p3 | n_c |
| 2021-01-01 | 7413 | p1 | f_c |
| ... | ... | ... | .. |
我想创建一个输出Dataframe id1
继承 status
从 id2
. 我面临的问题是,这两者之间存在一对多的关系 id1
以及 id2
所以很难继承身份。
考虑到这一点,我想 greatest
/ max
但这也很困难,因为它们是字符串。不过,有这样一种等级制度 i > f_c > n_c
.
基于以上,我希望我的输出如下所示:
| yyyy_mm_dd | id1 | product | status |
|------------|-----|---------|--------|
| 2020-01-01 | 1 | p1 | i |
| 2020-01-01 | 1 | p2 | f_c |
| 2020-01-01 | 1 | p3 | n_c |
| 2020-01-01 | 2 | p1 | f_c |
有了输出, id1
=1继承
i status
为了 p1
因为 i
是世界上最伟大的地位 id2
= (3245, 4564, 3546)
. 同样的情况也可以看到 id1
=1和 p2
,作为 f_c
最终成为 status
因为它是世界上最大的 id2
= (3245, 4564, 3546)
.
我知道我可以这样加入数据:
df3 = (
df1
.join(df2, on = ['yyyy_mm_dd', 'id2']
)
但我不知道该怎么做 status
在 id2
因为它不是数字。
1条答案
按热度按时间nfzehxib1#
你可以加入
df
与df2
在id2
以及yyyy_mm_dd
列,然后计算行数和orderby when表达式,以便对status
```from pyspark.sql import functions as F, Window
result = df1.join(df2, ["yyyy_mm_dd", "id2"]).withColumn(
"rn",
F.row_number().over(
Window.partitionBy("yyyy_mm_dd", "id1", "product").orderBy(
F.when(F.col("status") == "i", 1).when(F.col("status") == "f_c", 2).when(F.col("status") == "n_c", 3)
)
)
).filter("rn = 1").drop("id2", "rn")
result.show()
+----------+---+-------+------+
|yyyy_mm_dd|id1|product|status|
+----------+---+-------+------+
|2021-01-01| 1| p2| f_c|
|2021-01-01| 2| p1| f_c|
|2021-01-01| 1| p1| i |
|2021-01-01| 1| p3| n_c|
+----------+---+-------+------+
result = df1.join(df2, ["yyyy_mm_dd", "id2"]).groupBy("yyyy_mm_dd", "id1", "product").agg(
F.max(
F.when(F.col("status") == "i", 3).when(F.col("status") == "f_c", 2).when(F.col("status") == "n_c", 1)
).alias("max_status")
).select(
"yyyy_mm_dd", "id1", "product",
F.when(F.col("max_status") == 3, "i")
.when(F.col("max_status") == 2, "f_c")
.when(F.col("max_status") == 1, "n_c").alias("status")
)