从Pyspark动态帧中的结构化字段过滤空值

xuo3flqw  于 2023-03-17  发布在  Spark
关注(0)|答案(1)|浏览(172)

我有一个非常嵌套的json,在只过滤了我想处理的类别字段之后,我的列'data'就剩下了和原始dataframe相同的复杂结构/模式:

  1. +--------------------+...
  2. | data|
  3. +--------------------+...
  4. |{null, null, 833,...|...

并且数据结构包括大约250个嵌套字段,其中95%是空的。
我的目标是将数据列转换为仅包含非空字段的聚合,理想情况下仅包含非空字段的子集,但结构化模式是从读取数据时继承的,而且似乎找不到可以再次重新创建模式的方法。
我尝试过的方法:

  1. filtered4.filter(f.col('data').isNotNull()) / isNull()但这会清除整行/不做任何事情。
  2. ws_concat和coalesce()类似于:df.withColumn("data", concat_ws(", ", coalesce(col("data.street"), ""), coalesce(col("data.city.neighborhood"), ""), coalesce(col("address.state"), ""))) ...,但这不是一个选项,因为我有100个字段。
    1.将“data”列转换为字符串,然后使用regex进行清理,但这样做会丢失希望保留的字段的结构/名称
    对于上下文,这些是动态的结构化字段(例如网站的请求标题),并且根据数据的不同而改变结构,因此我想把它放在一个地方,并且只在我想要的时候访问它。我认为最好保持它为一个字符串类型,没有所有这些空值(“{null,null,833,..”),但是我非常愿意听取人群中更有经验的pysparker的建议。
c90pui9n

c90pui9n1#

关于您尝试的第二个选项(以及其中的难点),您可以尝试以下方法来收集struct列中的所有字段。

  1. import typing
  2. import pyspark.sql.functions as F
  3. from pyspark.sql import SparkSession
  4. from pyspark.sql.dataframe import DataFrame
  5. from pyspark.sql.types import StructType, StructField, StringType, IntegerType
  6. def construct(prefix: str, fields: typing.List) -> typing.List:
  7. '''
  8. Recursive function to find all nested fields in the struct field
  9. '''
  10. # List holding all nested fields found in this call
  11. found_fields = []
  12. # Loop over the list of fields passed as an argument
  13. for field in fields:
  14. if isinstance(field.dataType, StructType):
  15. found_fields += \
  16. [
  17. prefix+'.'+item for item in
  18. construct(field.name, field.dataType.fields)
  19. ]
  20. else:
  21. found_fields.append(prefix+'.'+field.name)
  22. return found_fields
  23. def all_nested_fields(df: DataFrame, col: str) -> typing.List[str]:
  24. '''
  25. Collect all nested fields in a struct column in a Spark data frame
  26. '''
  27. if isinstance(df.schema[col].dataType, StructType):
  28. print('%s is a struct column. Finding all nested fields' % (col, ))
  29. return construct(col, schema[col].dataType.fields)
  30. else:
  31. print('%s is not a struct column.' % (col, ))
  32. spark = SparkSession.builder.getOrCreate()
  33. data = [
  34. ((((("Iran", 30)),), "Sajad", "Safarveisi"), "Tehran", "Persian Golf"),
  35. ((((("USA", 40)),), "James", "Baker"), "Washington", "Wall street"),
  36. ((((("Germany", 25)),), "Patrick", "Gottmann"), "Colon", "Black forest")]
  37. schema = StructType([
  38. StructField("name", StructType([
  39. StructField("details", StructType([
  40. StructField("nationality", StructType([
  41. StructField("country", StringType(), True),
  42. StructField("age", IntegerType(), True)
  43. ]), False)]), False),
  44. StructField("firstname", StringType(), True),
  45. StructField("lastname", StringType(), True),
  46. ]), False),
  47. StructField("city", StringType(), True),
  48. StructField("attribute", StringType(), True)])
  49. df = spark.createDataFrame(data=data, schema=schema)
  50. # All nested fields in the column 'name'
  51. nested_fields = all_nested_fields(df, 'name')
  52. # Create a spark data frame from them (example operation)
  53. df.select(*[F.col(field) for field in nested_fields]).show()
展开查看全部

相关问题