解析spark中的嵌套json

z8dt9xmd  于 2021-07-13  发布在  Spark
关注(0)|答案(2)|浏览(463)

我有一个json文件,其模式如下--

root
 |-- errorcode: string (nullable = true)
 |-- errormessage: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- label: string (nullable = true)
 |-- status: string (nullable = true)
 |-- storageidlist: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- errorcode: string (nullable = true)
 |    |    |-- errormessage: string (nullable = true)
 |    |    |-- fedirectorList: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- directorId: string (nullable = true)
 |    |    |    |    |-- errorcode: string (nullable = true)
 |    |    |    |    |-- errordesc: string (nullable = true)
 |    |    |    |    |-- metrics: string (nullable = true)
 |    |    |    |    |-- portMetricDataList: array (nullable = true)
 |    |    |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |-- data: array (nullable = true)
 |    |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |    |    |-- ts: string (nullable = true)
 |    |    |    |    |    |    |    |    |    |-- value: string (nullable = true)
 |    |    |    |    |    |    |    |-- errorcode: string (nullable = true)
 |    |    |    |    |    |    |    |-- errordesc: string (nullable = true)
 |    |    |    |    |    |    |    |-- metricid: string (nullable = true)
 |    |    |    |    |    |    |    |-- portid: string (nullable = true)
 |    |    |    |    |    |    |    |-- status: string (nullable = true)
 |    |    |    |    |-- status: string (nullable = true)
 |    |    |-- metrics: string (nullable = true)
 |    |    |-- status: string (nullable = true)
 |    |    |-- storageGroupList: string (nullable = true)
 |    |    |-- storageid: string (nullable = true)
 |-- sublabel: string (nullable = true)
 |-- ts: string (nullable = true)

我应该提取ip、storageid、directorid、metricid、value和ts。在storageidlist中,只有1项,但在fedirectorlist中,有56项。但是我无法解析storageidlist之外的json。

scala> val ip_df = spark.read.option("multiline",true).json("FEDirector_port_data.txt")
ip_df: org.apache.spark.sql.DataFrame = [errorcode: string, errormessage: string ... 6 more fields]

scala> ip_df.select($"storageidlist.storageid").show()
+--------------+
|     storageid|
+--------------+
|[000295700670]|
+--------------+

scala> ip_df.select($"storageidlist.fedirectorList.directorId").show()
org.apache.spark.sql.AnalysisException: cannot resolve '`storageidlist`.`fedirectorList`['directorId']' due to data type mismatch: argument 2 requires integral type, however, ''directorId'' is of string type.;;
bbuxkriu

bbuxkriu1#

storageidlist 是数组列,因此需要选择第一个数组元素并从中进行进一步选择:

ip_df.selectExpr("storageidlist[0].fedirectorList.directorId")

ip_df.select($"storageidlist"(0).getField("fedirectorList").getField("directorId"))

在处理数组类型列时,最好指定一个数组索引。如果不指定数组索引,则可以更深一层,在下一层中获取所有相应的结构元素,但不能再进一步,如问题中所示。

zqry0prt

zqry0prt2#

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
file = "<s3path>/<json_file_name.json>"
schema_path = "<s3path>/<json_schame_name.json>"
json_schema = spark.read.json(schema_path, multiLine=True)
df = sqlContext.read.json(file,json_schema.json_schema,multiLine=True)

# display(df)

df.createOrReplaceTempView("temptable")

# example UDF

def parse_nested_list(nested_list):
    parsed_str = []
    if nested_list:
        for item_list in nested_list:
            if item_list:
                for item in item_list:
                    if item:
                        parsed_str.append(item)
    return "|".join(parsed_str)
def parse_arrs(x):
    if x:
        return "| ".join(
        ", ".join(i for i in e if i is not None) for e in x if e is not None
    )
    else:
        ""
sqlContext.udf.register("parse_nested_list", parse_nested_list)
sqlContext.udf.register("parse_arrs", parse_arrs)
structured_df =sqlContext.sql("select parse_nested_list(column1.column2) as column3, parse_arrs(column1) as column2 from temptable")
display(structured_df)

获取嵌套数组、列表、字典。您必须编写一个udf来获取嵌套值并将其注册到pyspark,以便在sparksql编码中使用它们。

相关问题