解析spark中的嵌套json

z8dt9xmd  于 2021-07-13  发布在  Spark
关注(0)|答案(2)|浏览(484)

我有一个json文件,其模式如下--

  1. root
  2. |-- errorcode: string (nullable = true)
  3. |-- errormessage: string (nullable = true)
  4. |-- ip: string (nullable = true)
  5. |-- label: string (nullable = true)
  6. |-- status: string (nullable = true)
  7. |-- storageidlist: array (nullable = true)
  8. | |-- element: struct (containsNull = true)
  9. | | |-- errorcode: string (nullable = true)
  10. | | |-- errormessage: string (nullable = true)
  11. | | |-- fedirectorList: array (nullable = true)
  12. | | | |-- element: struct (containsNull = true)
  13. | | | | |-- directorId: string (nullable = true)
  14. | | | | |-- errorcode: string (nullable = true)
  15. | | | | |-- errordesc: string (nullable = true)
  16. | | | | |-- metrics: string (nullable = true)
  17. | | | | |-- portMetricDataList: array (nullable = true)
  18. | | | | | |-- element: array (containsNull = true)
  19. | | | | | | |-- element: struct (containsNull = true)
  20. | | | | | | | |-- data: array (nullable = true)
  21. | | | | | | | | |-- element: struct (containsNull = true)
  22. | | | | | | | | | |-- ts: string (nullable = true)
  23. | | | | | | | | | |-- value: string (nullable = true)
  24. | | | | | | | |-- errorcode: string (nullable = true)
  25. | | | | | | | |-- errordesc: string (nullable = true)
  26. | | | | | | | |-- metricid: string (nullable = true)
  27. | | | | | | | |-- portid: string (nullable = true)
  28. | | | | | | | |-- status: string (nullable = true)
  29. | | | | |-- status: string (nullable = true)
  30. | | |-- metrics: string (nullable = true)
  31. | | |-- status: string (nullable = true)
  32. | | |-- storageGroupList: string (nullable = true)
  33. | | |-- storageid: string (nullable = true)
  34. |-- sublabel: string (nullable = true)
  35. |-- ts: string (nullable = true)

我应该提取ip、storageid、directorid、metricid、value和ts。在storageidlist中,只有1项,但在fedirectorlist中,有56项。但是我无法解析storageidlist之外的json。

  1. scala> val ip_df = spark.read.option("multiline",true).json("FEDirector_port_data.txt")
  2. ip_df: org.apache.spark.sql.DataFrame = [errorcode: string, errormessage: string ... 6 more fields]
  3. scala> ip_df.select($"storageidlist.storageid").show()
  4. +--------------+
  5. | storageid|
  6. +--------------+
  7. |[000295700670]|
  8. +--------------+
  9. scala> ip_df.select($"storageidlist.fedirectorList.directorId").show()
  10. org.apache.spark.sql.AnalysisException: cannot resolve '`storageidlist`.`fedirectorList`['directorId']' due to data type mismatch: argument 2 requires integral type, however, ''directorId'' is of string type.;;
bbuxkriu

bbuxkriu1#

storageidlist 是数组列,因此需要选择第一个数组元素并从中进行进一步选择:

  1. ip_df.selectExpr("storageidlist[0].fedirectorList.directorId")

  1. ip_df.select($"storageidlist"(0).getField("fedirectorList").getField("directorId"))

在处理数组类型列时,最好指定一个数组索引。如果不指定数组索引,则可以更深一层,在下一层中获取所有相应的结构元素,但不能再进一步,如问题中所示。

zqry0prt

zqry0prt2#

  1. from pyspark.sql import SQLContext
  2. sqlContext = SQLContext(sc)
  3. file = "<s3path>/<json_file_name.json>"
  4. schema_path = "<s3path>/<json_schame_name.json>"
  5. json_schema = spark.read.json(schema_path, multiLine=True)
  6. df = sqlContext.read.json(file,json_schema.json_schema,multiLine=True)
  7. # display(df)
  8. df.createOrReplaceTempView("temptable")
  9. # example UDF
  10. def parse_nested_list(nested_list):
  11. parsed_str = []
  12. if nested_list:
  13. for item_list in nested_list:
  14. if item_list:
  15. for item in item_list:
  16. if item:
  17. parsed_str.append(item)
  18. return "|".join(parsed_str)
  19. def parse_arrs(x):
  20. if x:
  21. return "| ".join(
  22. ", ".join(i for i in e if i is not None) for e in x if e is not None
  23. )
  24. else:
  25. ""
  26. sqlContext.udf.register("parse_nested_list", parse_nested_list)
  27. sqlContext.udf.register("parse_arrs", parse_arrs)
  28. structured_df =sqlContext.sql("select parse_nested_list(column1.column2) as column3, parse_arrs(column1) as column2 from temptable")
  29. display(structured_df)

获取嵌套数组、列表、字典。您必须编写一个udf来获取嵌套值并将其注册到pyspark,以便在sparksql编码中使用它们。

展开查看全部

相关问题