我在python中有一个sparkDataframe,它按照特定的顺序将行划分为正确的组,根据“start\u of \u section”列,该列的值为1或0。对于每个需要分组在一起的行集合,“value”和“start\u of \u section”之外的每一列都是相等的。我想将每个这样的集合分组到一行中,该行的其他每一列的值都相同,而列“list\u values”的每一行中都有一个值数组。
所以有些行可能看起来像:
Row(category=fruit, object=apple, value=60, start_of_section=1)
Row(category=fruit, object=apple, value=160, start_of_section=0)
Row(category=fruit, object=apple, value=30, start_of_section=0)
在新的Dataframe中
Row(category=fruit, object=apple, list_values=[60, 160, 30])
(编辑:请注意,列“start\u of \u section”不应包含在最终Dataframe中。)
我在试图研究答案时遇到的问题是,我只找到了按列值分组的方法,而不考虑顺序,这样会错误地产生两行,一个是将“start\u of \u section”=1的所有行分组,另一个是将“start\u of \u section”=0的所有行分组。。
什么代码可以实现这一点?
2条答案
按热度按时间bt1cpqcv1#
好吧,现在我明白了。你可以用求和
start_of_section
.为了确定结果,应该包括ordering列。
失败:
monotonically_increasing_id
当您有许多分区时失败。这完全不是我们想要的。
u0njafvf2#
假设您的订单列是
order_col
```df.show()
+--------+------+---------+----------------+-----+
|category|object|order_col|start_of_section|value|
+--------+------+---------+----------------+-----+
| fruit| apple| 1| 1| 60|
| fruit| apple| 2| 0| 160|
| fruit| apple| 3| 0| 30|
| fruit| apple| 4| 1| 50|
+--------+------+---------+----------------+-----+
from pyspark.sql import functions as F, Window as W
df.withColumn(
"id",
F.sum("start_of_section").over(
W.partitionBy("category", "object").orderBy("order_col")
),
).groupBy("category", "object", "id").agg(F.collect_list("value").alias("values")).drop(
"id"
).show()
+--------+------+-------------+
|category|object| values|
+--------+------+-------------+
| fruit| apple|[60, 160, 30]|
| fruit| apple| [50]|
+--------+------+-------------+