我想按列对给定的Dataframe进行分组 A
. 然后,每组应按列排序 B
. 从我要选择的每个排序组中 n
行,即包含第一行的行 n
列中的不同值 C
. 这样做的操作链总体上应该像一个过滤器,即原始Dataframe的所有列都应该包含在输出Dataframe中。
鉴于 df
```
df = spark.createDataFrame([
# FIRST GROUP (5 distinct C)
# rows share [A, C]-combination
[1, 1, 2, 1],
[1, 2, 2, 2],
# rows share [A, B]-combination
[1, 3, 3, 3],
[1, 3, 4, 4],
# rows share [A, B, C]-combination
[1, 4, 4, 5],
[1, 4, 4, 6],
# rows share only A
[1, 5, 6, 7],
[1, 6, 7, 8],
# SECOND GROUP (1 distinct C)
# same A, B and C
[2, 1, 1, 9],
[2, 1, 1, 10],
], ["A", "B", "C", "D"])
的预期结果 `n=4` 是
+---+---+---+---+
| A| B| C| D|
+---+---+---+---+
| 1| 1| 2| 1|
| 1| 3| 3| 3|
| 1| 3| 4| 4|
| 1| 5| 6| 7|
| 1| 6| 7| 8|
| 2| 1| 1| 9|
+---+---+---+---+
如果我们忽略了 `D` 解决这个问题的一个可能办法是
first_n_distinct = (
df
.groupBy("A", "C")
.agg(F.min("B").alias("B"))
.withColumn("rn", F.row_number().over(Window
.partitionBy("A")
.orderBy("B")))
.filter(F.col("rn") <= n)
)
到目前为止,要得到我所期望的结果,最好的方法就是过滤 `df` 基于中的[a,b,c]-组合 `first_n_distinct` 如果结果中存在重复行,请选择具有最小行数的行 `B` 价值观。
df_subset_with_duplicates = (
df
.join(first_n_distinct, on=["A", "B", "C"], how="left")
.filter(~F.isnull("rn"))
)
df_subset_first_n_distinct = (
df_subset_with_duplicates
.withColumn("rn2", F.row_number().over(Window
.partitionBy("A", "B", "C")
.orderBy("B")))
.filter(F.col("rn2") == 1)
.drop("rn2")
)
然而,对我来说,这似乎不是解决这个问题的最有效的方法(但至少它毕竟是一个解决方案!)。有人知道一个更好更容易阅读/理解的方法来做同样的事情吗?
1条答案
按热度按时间kg7wmglp1#
你可以试试看
dense_rank
过滤器后跟row_number
过滤器: