如何在PySpark中将array(struct(array))扁平化为array(struct)?

brvekthn  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(188)

DataFrame列包含嵌套数组,嵌套结构位于数组之间(array(struct(array):

  1. root
  2. |-- a: integer (nullable = true)
  3. |-- list: array (nullable = true)
  4. | |-- element: struct (containsNull = true)
  5. | | |-- b: integer (nullable = true)
  6. | | |-- sub_list: array (nullable = true)
  7. | | | |-- element: struct (containsNull = true)
  8. | | | | |-- c: integer (nullable = true)
  9. | | | | |-- foo: string (nullable = true)

字符串
我想把schema简化为一个struct数组:

  1. root
  2. |-- a: integer (nullable = true)
  3. |-- list: array (nullable = true)
  4. | |-- element: struct (containsNull = true)
  5. | | |-- b: integer (nullable = true)
  6. | | |-- c: integer (nullable = true)
  7. | | |-- foo: string (nullable = true)


我用的是Spark 3.5.0。
我尝试了flatten,inline,explode,transform等,但没有任何成功。
更新:我发现了以下解决方案,适用于这个简化的示例:

  1. df.select("a", inline("list")) \
  2. .select(expr("*"), inline("sub_list")) \
  3. .drop("sub_list") \
  4. .groupBy("a") \
  5. .agg(collect_list(struct("b", "c", "foo")).alias("list"))


但在更复杂的场景中,这会变得更复杂,因为似乎我必须将所有内容自上而下分解(“升级”)到行级别。
我更喜欢下面这样的东西(这会抛出一个错误):

  1. df.withColumn("list", transform(col("list"), lambda x: x.withField("sub_list", explode(x.getField("sub_list")))))


换句话说:我想将内部数组分解为外部数组(自下而上)。

e4yzc0pl

e4yzc0pl1#

我想我明白了:

  1. df.withColumn(
  2. "list",
  3. flatten(
  4. transform(
  5. col("list"),
  6. lambda x: transform(
  7. x.getField("sub_list"),
  8. lambda y: struct(x.getField("b"), y.getField("c"), y.getField("foo")),
  9. ),
  10. )
  11. ),
  12. )

字符串
其逻辑是:
1.使用嵌套转换将列B从外部数组移动到内部数组
1.将转换后的数组展平为单个数组

展开查看全部

相关问题