我有一个json文件,其模式如下--
root
|-- errorcode: string (nullable = true)
|-- errormessage: string (nullable = true)
|-- ip: string (nullable = true)
|-- label: string (nullable = true)
|-- status: string (nullable = true)
|-- storageidlist: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- errorcode: string (nullable = true)
| | |-- errormessage: string (nullable = true)
| | |-- fedirectorList: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- directorId: string (nullable = true)
| | | | |-- errorcode: string (nullable = true)
| | | | |-- errordesc: string (nullable = true)
| | | | |-- metrics: string (nullable = true)
| | | | |-- portMetricDataList: array (nullable = true)
| | | | | |-- element: array (containsNull = true)
| | | | | | |-- element: struct (containsNull = true)
| | | | | | | |-- data: array (nullable = true)
| | | | | | | | |-- element: struct (containsNull = true)
| | | | | | | | | |-- ts: string (nullable = true)
| | | | | | | | | |-- value: string (nullable = true)
| | | | | | | |-- errorcode: string (nullable = true)
| | | | | | | |-- errordesc: string (nullable = true)
| | | | | | | |-- metricid: string (nullable = true)
| | | | | | | |-- portid: string (nullable = true)
| | | | | | | |-- status: string (nullable = true)
| | | | |-- status: string (nullable = true)
| | |-- metrics: string (nullable = true)
| | |-- status: string (nullable = true)
| | |-- storageGroupList: string (nullable = true)
| | |-- storageid: string (nullable = true)
|-- sublabel: string (nullable = true)
|-- ts: string (nullable = true)
我应该提取ip、storageid、directorid、metricid、value和ts。在storageidlist中,只有1项,但在fedirectorlist中,有56项。但是我无法解析storageidlist之外的json。
scala> val ip_df = spark.read.option("multiline",true).json("FEDirector_port_data.txt")
ip_df: org.apache.spark.sql.DataFrame = [errorcode: string, errormessage: string ... 6 more fields]
scala> ip_df.select($"storageidlist.storageid").show()
+--------------+
| storageid|
+--------------+
|[000295700670]|
+--------------+
scala> ip_df.select($"storageidlist.fedirectorList.directorId").show()
org.apache.spark.sql.AnalysisException: cannot resolve '`storageidlist`.`fedirectorList`['directorId']' due to data type mismatch: argument 2 requires integral type, however, ''directorId'' is of string type.;;
2条答案
按热度按时间bbuxkriu1#
storageidlist
是数组列,因此需要选择第一个数组元素并从中进行进一步选择:或
在处理数组类型列时,最好指定一个数组索引。如果不指定数组索引,则可以更深一层,在下一层中获取所有相应的结构元素,但不能再进一步,如问题中所示。
zqry0prt2#
获取嵌套数组、列表、字典。您必须编写一个udf来获取嵌套值并将其注册到pyspark,以便在sparksql编码中使用它们。