如何在PySpark中分解数组列以生成布尔列

r55awzrz  于 2022-11-16  发布在  Apache
关注(0)|答案(2)|浏览(144)

我有一个这样的数据框:

+------------+-----------------+------------------------------------+
| Name       |   Age           | Answers                            |
+------------+-----------------+------------------------------------+
| Maria      | 23              | [apple, mango, orange, banana]     | 
| John       | 55              | [apple, orange, banana]            |
| Brad       | 44              | [banana]                           |
| Alex       | 55              | [apple, mango, orange, banana]     |
+------------+-----------------+------------------------------------+

“Answers”列包含一个元素数组。
我的预期输出:

+-----+---+--------+-------+                                                              
| Name|Age|  answer| value |
+-----+---+--------+-------+
|Maria| 23|   apple| True  |
|Maria| 23|   mango| True  |
|Maria| 23|  orange| True  |
|Maria| 23|  banana| True  |
| John| 55|   apple| True  |
| John| 55|   mango| False |
| John| 55|  orange| True  |
| John| 55|  banana| True  |
| Brad| 44|   apple| False |
| Brad| 44|   mango| False |
| Brad| 44|  orange| False |
| Brad| 44|  banana| True  |
|Alex | 55|   apple| True  |
|Alex | 55|   mango| True  |
|Alex | 55|  orange| True  |
|Alex | 55|  banana| True  |
+-----+---+--------+-------+

如何分解“Answers”列,以便根据数组得到“value”列的True或False?
例如,

| John| 55|   mango| False |

John的答案中没有“mango”。因此值为false。类似地,对于布拉德,将有三个false行。

u3r8eeie

u3r8eeie1#

在展开之前,您可以收集“Answers”列中的所有可能值。将它们添加到数据框中,展开并选择所需的列。
输入:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [('Maria', 23, ['apple', 'mango', 'orange', 'banana']),
     ('John', 55, ['apple', 'orange', 'banana']),
     ('Brad', 44, ['banana']),
     ('Alex', 55, ['apple', 'mango', 'orange', 'banana'])],
    ['Name', 'Age', 'Answers'])

脚本:

unique_answers = set(df.agg(F.flatten(F.collect_set('Answers'))).head()[0])
df = df.withColumn('answer', F.explode(F.array([F.lit(x) for x in unique_answers])))
df = df.select(
    'Name', 'Age', 'answer',
    F.exists('Answers', lambda x: x == F.col('answer')).alias('value')
    *[c for c in df.columns if c not in {'Name', 'Age', 'Answers', 'answer'}]
)
df.show()
# +-----+---+------+-----+
# | Name|Age|answer|value|
# +-----+---+------+-----+
# |Maria| 23|orange| true|
# |Maria| 23| mango| true|
# |Maria| 23| apple| true|
# |Maria| 23|banana| true|
# | John| 55|orange| true|
# | John| 55| mango|false|
# | John| 55| apple| true|
# | John| 55|banana| true|
# | Brad| 44|orange|false|
# | Brad| 44| mango|false|
# | Brad| 44| apple|false|
# | Brad| 44|banana| true|
# | Alex| 55|orange| true|
# | Alex| 55| mango| true|
# | Alex| 55| apple| true|
# | Alex| 55|banana| true|
# +-----+---+------+-----+
zbdgwd5y

zbdgwd5y2#

利用transformarrays_zip函数的一种方法

data_sdf. \
    withColumnRenamed('answers', 'ans_per_name'). \
    withColumn('answers', 
               func.array_distinct(func.flatten(func.collect_set('ans_per_name').over(wd.partitionBy())))
               ). \
    withColumn('value', 
               func.expr('transform(answers, x -> array_contains(ans_per_name, x))')
               ). \
    withColumn('ans_val_struct', func.arrays_zip('answers', 'value')). \
    selectExpr('name', 'age', 'inline(ans_val_struct)'). \
    show(truncate=False)

# +-----+---+-------+-----+
# |name |age|answers|value|
# +-----+---+-------+-----+
# |Maria|23 |apple  |true |
# |Maria|23 |orange |true |
# |Maria|23 |banana |true |
# |Maria|23 |mango  |true |
# |John |55 |apple  |true |
# |John |55 |orange |true |
# |John |55 |banana |true |
# |John |55 |mango  |false|
# |Brad |44 |apple  |false|
# |Brad |44 |orange |false|
# |Brad |44 |banana |true |
# |Brad |44 |mango  |false|
# |Alex |55 |apple  |true |
# |Alex |55 |orange |true |
# |Alex |55 |banana |true |
# |Alex |55 |mango  |true |
# +-----+---+-------+-----+
  • 我们的想法是针对每个名称获得所有答案。collect_set沿着flattenarray_distinct可以做到这一点。
  • transform对照先前为每个姓名提供的答案数组检查收集的每个答案。如果该元素存在,则标记为True
  • arrays_zip将压缩2个数组以创建一个结构体数组,其中第N个结构体将具有每个数组中的第N个元素。
  • inline sql函数有助于从结构字段展开和创建新列

相关问题