分组并分解pyspark数组类型列

n53p2ov0  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(480)

我有静态列表 group_1 以及 group_2 :

group_1 = [a,b,c,d,e,f,g]
group_2 = [h,i,j,k]

我有PyparkDataframe df1 如下所示。
例1:
df1型:

+-----+----------------------------------------+-----------------------------------------+
|id   |array1                                  |array2                                   |
+-----+----------------------------------------+-----------------------------------------+
|id1  |[a,b,c,d,group_1,group_2]               |[a,b,c,d,e,f,g,h,i,j,k]                  |
+-----+----------------------------------------+-----------------------------------------+

输出功率因数:

+-----+-------------------|-------------------|
|id   |col1               |col2               |
+-----+-------------------|-------------------|
|id1  |[a,b,c,d]          |[a,b,c,d]          |
|id1  |[e,f,g]            |group_1            |
|id1  |[h,i,j,k]          |group_2            |
+-----+-------------------|-------------------|

事实上, array2 列将包含来自 array1 列。这就是我的源Dataframe( source_df1 )会的。
如果我们看到 array1 列中有单独的元素,如 (a,b,c,d) 还有 group_1 以及 group_2 但所有这些元素在一起是不同的。
现在我想通过分解这样一种方式来创建pysparkDataframe,即对单个元素和组元素进行分类,如中所示 output_df .
示例1观察:如果我们看到输出Dataframe output_df ,第二条记录 group_1 只有 [e,f,g] 因为其他元素已经是单个元素的一部分了。
例2:
来源:df1:

+-----+----------------------------------------+-----------------------------------------+
|id   |array1                                  |array2                                   |
+-----+----------------------------------------+-----------------------------------------+
|id1  |[a,b,group_1,group_2]                   |[a,b,c,d,e,f,g,h,i,j,k]                  |
+-----+----------------------------------------+-----------------------------------------+

输出功率因数:

+-----+-------------------|-------------------|
|id   |col1               |col2               |
+-----+-------------------|-------------------|
|id1  |[a,b]              |[a,b]              |
|id1  |[c,d,e,f,g]        |group_1            |
|id1  |[h,i,j,k]          |group_2            |
+-----+-------------------|-------------------|

示例2观察:如果我们看到输出Dataframe output_df . 第二条记录 group_1 只有 [c,d,e,f,g] 因为其他元素已经是单个元素的一部分了。
有谁能帮忙实现这个目标吗?

mrphzbgm

mrphzbgm1#

如果可以使用spark 2.4+,可以通过一些数组函数来实现:

from pyspark.sql import functions as F

df1 = df.withColumn(
    "individual",
    F.array_except(F.col("array1"), F.array(*[F.lit("group_1"), F.lit("group_2")]))
).withColumn(
    "group_1",
    F.array_except(F.array(*[F.lit(i) for i in group_1]), "individual")
).withColumn(
    "group_2",
    F.array_except(F.array(*[F.lit(i) for i in group_2]), "individual")
).withColumn(
    "array2",
    F.explode(F.array(
        *[
            F.struct(F.array_intersect("array2", "individual").alias("col1"),
                     F.col("individual").cast("string").alias("col2")),
            F.struct(F.array_intersect("array2", "group_1").alias("col1"),
                     F.lit("group_1").alias("col2")),
            F.struct(F.array_intersect("array2", "group_2").alias("col1"),
                     F.lit("group_2").alias("col2"))
        ])
    )
).select("id", "array2.*")

df1.show(truncate=False)

# +---+------------+------------+

# |id |col1        |col2        |

# +---+------------+------------+

# |id1|[a, b, c, d]|[a, b, c, d]|

# |id1|[e, f, g]   |group_1     |

# |id1|[h, i, j, k]|group_2     |

# +---+------------+------------+

说明:
首先,划分 array1 分成三个阵列: individual , group_1 以及 group_2 . 每一个都包含对应组的元素。元素来自 group_1 以及 group_2 存在于 individual 从这些组中删除。
然后,使用 array_intersect 函数从中获取元素 array2 列,这些列分别存在于上面创建的三个组数组中。
最后,分解上面创建的新数组
请注意,如果要验证 group_1 或者 group_2 存在于 array1 可以使用的列 whenarray_contains 功能:

F.when(
    F.array_contains(F.col("array1"), F.lit("group_1")),
    F.array_except(F.array(*[F.lit(i) for i in group_1]), "individual")
)

在这个例子中,我假设它总是出现在 array1 .

相关问题