pyspark:按布尔列对连续行进行分组

qlzsbp2j  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(485)

我在python中有一个sparkDataframe,它按照特定的顺序将行划分为正确的组,根据“start\u of \u section”列,该列的值为1或0。对于每个需要分组在一起的行集合,“value”和“start\u of \u section”之外的每一列都是相等的。我想将每个这样的集合分组到一行中,该行的其他每一列的值都相同,而列“list\u values”的每一行中都有一个值数组。
所以有些行可能看起来像:

Row(category=fruit, object=apple, value=60, start_of_section=1)
Row(category=fruit, object=apple, value=160, start_of_section=0)
Row(category=fruit, object=apple, value=30, start_of_section=0)

在新的Dataframe中

Row(category=fruit, object=apple, list_values=[60, 160, 30])

(编辑:请注意,列“start\u of \u section”不应包含在最终Dataframe中。)
我在试图研究答案时遇到的问题是,我只找到了按列值分组的方法,而不考虑顺序,这样会错误地产生两行,一个是将“start\u of \u section”=1的所有行分组,另一个是将“start\u of \u section”=0的所有行分组。。
什么代码可以实现这一点?

bt1cpqcv

bt1cpqcv1#

好吧,现在我明白了。你可以用求和 start_of_section .
为了确定结果,应该包括ordering列。

from pyspark.sql.types import Row
from pyspark.sql.functions import *
from pyspark.sql import Window

data = [Row(category='fruit', object='apple', value=60, start_of_section=1),
    Row(category='fruit', object='apple', value=160, start_of_section=0),
    Row(category='fruit', object='apple', value=30, start_of_section=0),
    Row(category='fruit', object='apple', value=50, start_of_section=1),
    Row(category='fruit', object='apple', value=30, start_of_section=0),
    Row(category='fruit', object='apple', value=60, start_of_section=1),
    Row(category='fruit', object='apple', value=110, start_of_section=0)]

df = spark.createDataFrame(data)

w = Window.partitionBy('category', 'object').rowsBetween(Window.unboundedPreceding, Window.currentRow)

df.withColumn('group', sum('start_of_section').over(w)) \
  .groupBy('category', 'object', 'group').agg(collect_list('value').alias('list_value')) \
  .drop('group').show()

+--------+------+-------------+
|category|object|   list_value|
+--------+------+-------------+
|   fruit| apple|[60, 160, 30]|
|   fruit| apple|     [50, 30]|
|   fruit| apple|    [60, 110]|
+--------+------+-------------+

失败: monotonically_increasing_id 当您有许多分区时失败。

df.repartition(7) \
  .withColumn('id', monotonically_increasing_id()) \
  .withColumn('group', sum('start_of_section').over(w)) \
  .groupBy('category', 'object', 'group').agg(collect_list('value').alias('list_value')) \
  .drop('group').show()

+--------+------+--------------------+
|category|object|          list_value|
+--------+------+--------------------+
|   fruit| apple|                [60]|
|   fruit| apple|[60, 160, 30, 30,...|
|   fruit| apple|                [50]|
+--------+------+--------------------+

这完全不是我们想要的。

u0njafvf

u0njafvf2#

假设您的订单列是 order_col ```
df.show()
+--------+------+---------+----------------+-----+
|category|object|order_col|start_of_section|value|
+--------+------+---------+----------------+-----+
| fruit| apple| 1| 1| 60|
| fruit| apple| 2| 0| 160|
| fruit| apple| 3| 0| 30|
| fruit| apple| 4| 1| 50|
+--------+------+---------+----------------+-----+

您需要生成一个id来将同一节中的行分组在一起,然后根据该id和所需的维度进行分组。这是你怎么做的。

from pyspark.sql import functions as F, Window as W

df.withColumn(
"id",
F.sum("start_of_section").over(
W.partitionBy("category", "object").orderBy("order_col")
),
).groupBy("category", "object", "id").agg(F.collect_list("value").alias("values")).drop(
"id"
).show()

+--------+------+-------------+
|category|object| values|
+--------+------+-------------+
| fruit| apple|[60, 160, 30]|
| fruit| apple| [50]|
+--------+------+-------------+

编辑:如果没有 `order_col` ,这是一项不可能完成的任务。将Dataframe中的行视为包中的大理石。他们没有任何订单。你可以在把它们从袋子里拿出来的时候根据一些标准来订购它们,否则,你就不能接受任何订单。 `show` 就是你从袋子里拿出10颗弹珠(线)。每次你做的时候顺序可能是一样的,但是突然改变了,你无法控制它

相关问题