python—将pyspark dataframe按a分组,按b排序,然后在c中选择前n个不同的条目

mf98qq94  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(318)

我想按列对给定的Dataframe进行分组 A . 然后,每组应按列排序 B . 从我要选择的每个排序组中 n 行,即包含第一行的行 n 列中的不同值 C . 这样做的操作链总体上应该像一个过滤器,即原始Dataframe的所有列都应该包含在输出Dataframe中。
鉴于 df ```
df = spark.createDataFrame([
# FIRST GROUP (5 distinct C)
# rows share [A, C]-combination
[1, 1, 2, 1],
[1, 2, 2, 2],
# rows share [A, B]-combination
[1, 3, 3, 3],
[1, 3, 4, 4],
# rows share [A, B, C]-combination
[1, 4, 4, 5],
[1, 4, 4, 6],
# rows share only A
[1, 5, 6, 7],
[1, 6, 7, 8],
# SECOND GROUP (1 distinct C)
# same A, B and C
[2, 1, 1, 9],
[2, 1, 1, 10],
], ["A", "B", "C", "D"])

的预期结果 `n=4` 是

+---+---+---+---+
| A| B| C| D|
+---+---+---+---+
| 1| 1| 2| 1|
| 1| 3| 3| 3|
| 1| 3| 4| 4|
| 1| 5| 6| 7|
| 1| 6| 7| 8|
| 2| 1| 1| 9|
+---+---+---+---+

如果我们忽略了 `D` 解决这个问题的一个可能办法是

first_n_distinct = (
df
.groupBy("A", "C")
.agg(F.min("B").alias("B"))
.withColumn("rn", F.row_number().over(Window
.partitionBy("A")
.orderBy("B")))
.filter(F.col("rn") <= n)
)

到目前为止,要得到我所期望的结果,最好的方法就是过滤 `df` 基于中的[a,b,c]-组合 `first_n_distinct` 如果结果中存在重复行,请选择具有最小行数的行 `B` 价值观。

df_subset_with_duplicates = (
df
.join(first_n_distinct, on=["A", "B", "C"], how="left")
.filter(~F.isnull("rn"))
)
df_subset_first_n_distinct = (
df_subset_with_duplicates
.withColumn("rn2", F.row_number().over(Window
.partitionBy("A", "B", "C")
.orderBy("B")))
.filter(F.col("rn2") == 1)
.drop("rn2")
)

然而,对我来说,这似乎不是解决这个问题的最有效的方法(但至少它毕竟是一个解决方案!)。有人知道一个更好更容易阅读/理解的方法来做同样的事情吗?
kg7wmglp

kg7wmglp1#

你可以试试看 dense_rank 过滤器后跟 row_number 过滤器:

from pyspark.sql import functions as F, Window

n = 4

df2 = df.withColumn(
    'rn', 
    F.dense_rank().over(Window.partitionBy('A').orderBy('C'))
).filter(F.col('rn') <= n).withColumn(
    'rn', 
    F.row_number().over(Window.partitionBy('A', 'C').orderBy('B'))
).filter('rn = 1').drop('rn')

df2.show()
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|  1|  1|  2|  1|
|  1|  3|  3|  3|
|  1|  3|  4|  4|
|  1|  5|  6|  7|
|  2|  1|  1|  9|
+---+---+---+---+

相关问题