使用PySpark在spark框架中删除嵌套结构中的列(文本中的详细信息)

bmp9r5qi  于 2024-01-06  发布在  Spark
关注(0)|答案(2)|浏览(248)

我知道我问过一个类似的问题here,但那是为了行过滤。这次我试图删除列。我试图实现高阶函数,如FILTER和其他一段时间,但无法让它工作。我想我需要的是SELECT高阶函数,但它似乎不存在。
我正在使用pyspark,我有一个嵌套对象df,这就是df.printSchema()的输出

  1. root
  2. |-- M_MRN: string (nullable = true)
  3. |-- measurements: array (nullable = true)
  4. | |-- element: struct (containsNull = true)
  5. | | |-- Observation_ID: string (nullable = true)
  6. | | |-- Observation_Name: string (nullable = true)
  7. | | |-- Observation_Result: string (nullable = true)

字符串
我想在“measurements”中只保留“Observation_ID”或“Observation_Result”列。

  1. [Row(measurements=[Row(Observation_ID='5', Observation_Name='ABC', Observation_Result='108/72'),
  2. Row(Observation_ID='11', Observation_Name='ABC', Observation_Result='70'),
  3. Row(Observation_ID='10', Observation_Name='ABC', Observation_Result='73.029'),
  4. Row(Observation_ID='14', Observation_Name='XYZ', Observation_Result='23.1')]),
  5. Row(measurements=[Row(Observation_ID='2', Observation_Name='ZZZ', Observation_Result='3/4'),
  6. Row(Observation_ID='5', Observation_Name='ABC', Observation_Result='7')])]


我想在我做了上面的过滤和运行df.select('measurements').take(2)我得到

  1. [Row(measurements=[Row(Observation_ID='5', Observation_Result='108/72'),
  2. Row(Observation_ID='11', Observation_Result='70'),
  3. Row(Observation_ID='10', Observation_Result='73.029'),
  4. Row(Observation_ID='14', Observation_Result='23.1')]),
  5. Row(measurements=[Row(Observation_ID='2', Observation_Result='3/4'),
  6. Row(Observation_ID='5', Observation_Result='7')])]


在pyspark中有办法做到这一点吗?

pkmbmrz7

pkmbmrz71#

您可以使用**higher order function*transformselect您想要的字段,并将它们放在struct**中。

  1. from pyspark.sql import functions as F
  2. df.withColumn("measurements",F.expr("""transform(measurements\
  3. ,x-> struct(x.Observation_ID as Observation_ID,\
  4. x.Observation_Result as Observation_Result))""")).printSchema()
  5. #root
  6. #|-- measurements: array (nullable = true)
  7. #| |-- element: struct (containsNull = false)
  8. #| | |-- Observation_ID: string (nullable = true)
  9. #| | |-- Observation_Result: string (nullable = true)

字符串

hfwmuf9z

hfwmuf9z2#

对于任何想寻找一个能与旧版pyspark兼容的答案的人来说,这里有一个使用udfs的答案:

  1. import pyspark.sql.functions as f
  2. from pyspark.sql.types import ArrayType, LongType, StringType, StructField, StructType
  3. _measurement_type = ArrayType(StructType([
  4. StructField('Observation_ID', StringType(), True),
  5. StructField('Observation_Result', StringType(), True)
  6. ]))
  7. @f.udf(returnType=_measurement_type)
  8. def higher_order_select(measurements):
  9. return [(m.Observation_ID, m.Observation_Result) for m in measurements]
  10. df.select(higher_order_select('measurements').alias('measurements')).printSchema()

字符串
指纹

  1. root
  2. |-- measurements: array (nullable = true)
  3. | |-- element: struct (containsNull = true)
  4. | | |-- Observation_ID: string (nullable = true)
  5. | | |-- Observation_Result: string (nullable = true)

展开查看全部

相关问题