分组并分解pyspark数组类型列

n53p2ov0  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(504)

我有静态列表 group_1 以及 group_2 :

  1. group_1 = [a,b,c,d,e,f,g]
  2. group_2 = [h,i,j,k]

我有PyparkDataframe df1 如下所示。
例1:
df1型:

  1. +-----+----------------------------------------+-----------------------------------------+
  2. |id |array1 |array2 |
  3. +-----+----------------------------------------+-----------------------------------------+
  4. |id1 |[a,b,c,d,group_1,group_2] |[a,b,c,d,e,f,g,h,i,j,k] |
  5. +-----+----------------------------------------+-----------------------------------------+

输出功率因数:

  1. +-----+-------------------|-------------------|
  2. |id |col1 |col2 |
  3. +-----+-------------------|-------------------|
  4. |id1 |[a,b,c,d] |[a,b,c,d] |
  5. |id1 |[e,f,g] |group_1 |
  6. |id1 |[h,i,j,k] |group_2 |
  7. +-----+-------------------|-------------------|

事实上, array2 列将包含来自 array1 列。这就是我的源Dataframe( source_df1 )会的。
如果我们看到 array1 列中有单独的元素,如 (a,b,c,d) 还有 group_1 以及 group_2 但所有这些元素在一起是不同的。
现在我想通过分解这样一种方式来创建pysparkDataframe,即对单个元素和组元素进行分类,如中所示 output_df .
示例1观察:如果我们看到输出Dataframe output_df ,第二条记录 group_1 只有 [e,f,g] 因为其他元素已经是单个元素的一部分了。
例2:
来源:df1:

  1. +-----+----------------------------------------+-----------------------------------------+
  2. |id |array1 |array2 |
  3. +-----+----------------------------------------+-----------------------------------------+
  4. |id1 |[a,b,group_1,group_2] |[a,b,c,d,e,f,g,h,i,j,k] |
  5. +-----+----------------------------------------+-----------------------------------------+

输出功率因数:

  1. +-----+-------------------|-------------------|
  2. |id |col1 |col2 |
  3. +-----+-------------------|-------------------|
  4. |id1 |[a,b] |[a,b] |
  5. |id1 |[c,d,e,f,g] |group_1 |
  6. |id1 |[h,i,j,k] |group_2 |
  7. +-----+-------------------|-------------------|

示例2观察:如果我们看到输出Dataframe output_df . 第二条记录 group_1 只有 [c,d,e,f,g] 因为其他元素已经是单个元素的一部分了。
有谁能帮忙实现这个目标吗?

mrphzbgm

mrphzbgm1#

如果可以使用spark 2.4+,可以通过一些数组函数来实现:

  1. from pyspark.sql import functions as F
  2. df1 = df.withColumn(
  3. "individual",
  4. F.array_except(F.col("array1"), F.array(*[F.lit("group_1"), F.lit("group_2")]))
  5. ).withColumn(
  6. "group_1",
  7. F.array_except(F.array(*[F.lit(i) for i in group_1]), "individual")
  8. ).withColumn(
  9. "group_2",
  10. F.array_except(F.array(*[F.lit(i) for i in group_2]), "individual")
  11. ).withColumn(
  12. "array2",
  13. F.explode(F.array(
  14. *[
  15. F.struct(F.array_intersect("array2", "individual").alias("col1"),
  16. F.col("individual").cast("string").alias("col2")),
  17. F.struct(F.array_intersect("array2", "group_1").alias("col1"),
  18. F.lit("group_1").alias("col2")),
  19. F.struct(F.array_intersect("array2", "group_2").alias("col1"),
  20. F.lit("group_2").alias("col2"))
  21. ])
  22. )
  23. ).select("id", "array2.*")
  24. df1.show(truncate=False)
  25. # +---+------------+------------+
  26. # |id |col1 |col2 |
  27. # +---+------------+------------+
  28. # |id1|[a, b, c, d]|[a, b, c, d]|
  29. # |id1|[e, f, g] |group_1 |
  30. # |id1|[h, i, j, k]|group_2 |
  31. # +---+------------+------------+

说明:
首先,划分 array1 分成三个阵列: individual , group_1 以及 group_2 . 每一个都包含对应组的元素。元素来自 group_1 以及 group_2 存在于 individual 从这些组中删除。
然后,使用 array_intersect 函数从中获取元素 array2 列,这些列分别存在于上面创建的三个组数组中。
最后,分解上面创建的新数组
请注意,如果要验证 group_1 或者 group_2 存在于 array1 可以使用的列 whenarray_contains 功能:

  1. F.when(
  2. F.array_contains(F.col("array1"), F.lit("group_1")),
  3. F.array_except(F.array(*[F.lit(i) for i in group_1]), "individual")
  4. )

在这个例子中,我假设它总是出现在 array1 .

展开查看全部

相关问题