在spark中使用自定义模式阅读json文件不返回结果

epggiuax  于 2023-04-21  发布在  Apache
关注(0)|答案(1)|浏览(93)

我是emr/hdfs/hive/spark世界的新手。我有一个大型json文件的集合(〉50 GB每个文件),我试图加载,以便查询特定的键。有一个标准的布局的json文件,但不是所有的文件包含所有的键。我已经创建了一个自定义模式,包括整个标准,并创建了一个 Dataframe 使用此模式。当我试图读取/查询json文件只返回空值。我正在尝试使用hive/spark在EMR集群中POC加载文件。是否有更好的方法来处理TB的json文件?
为了简化问题,假设完整的标准是:reporting_entity_name字符串reporting_entity_type字符串版本字符串
文件A将有reporting_entity_name和version,但没有reporting_entity_type。文件B将有reporting_entity_name和reporting_entity_type,但没有version。
scala〉spark.sqlContext.sql(“SELECT reporting_entity_name,reporting_entity_type,version FROM vw“).show()
我需要文件A返回:
+-———————————————————-+-———————————————————-+-—————-+
|报告实体名称|报告实体类型|版本|
+-———————————————————-+-———————————————————-+-—————-+
| Mega Company Inc.|零 |1.3.1|
+-———————————————————-+-———————————————————-+-—————-+
文件B返回:
+-———————————————————-+-———————————————————-+-—————-+
|报告实体名称|报告实体类型|版本|
+-———————————————————-+-———————————————————-+-—————-+
| Mega Company Inc.|健康保险|零|
+-———————————————————-+-———————————————————-+-—————-+
但是从这两个文件返回的是:
+-———————————————————-+-———————————————————-+-—————-+
|报告实体名称|报告实体类型|版本|
+-———————————————————-+-———————————————————-+-—————-+
| 零 |零 |零|
+-———————————————————-+-———————————————————-+-—————-+
实际的标准json比上面的例子更复杂,可以在这里找到:https://github.com/CMSgov/price-transparency-guide/tree/master/schemas/in-network-rates#providers-object
下面是我的自定义模式:

`val schema = StructType(List(
StructField("reporting_entity_name", StringType), 
StructField("reporting_entity_type", StringType), 
StructField("plan_name", StringType), 
StructField("plan_id_type", StringType),  
StructField("plan_id", StringType), 
StructField("plan_market_type", StringType), 
StructField("in_network", ArrayType(StructType(List(
    StructField("negotiation_arrangement", StringType), 
    StructField("plan_name", StringType), 
    StructField("billing_code_type", StringType), 
    StructField("billing_code_type_version", StringType), 
    StructField("billing_code", StringType), 
    StructField("description", StringType), 
    StructField("negotiated_rates", ArrayType(StructType(Array(
        StructField("negotiated_prices", ArrayType(StructType(Array(
            StructField("negotiated_type", StringType),
            StructField("negotiated_rate", StringType),
            StructField("expiration_date", StringType),
            StructField("service_code", ArrayType(StringType)),
            StructField("billing_class", StringType),
            StructField("billing_code_modifier", ArrayType(StringType)),
            StructField("additional_information", StringType)))), true),
        StructField("provider_groups", ArrayType(StructType(Array(
            StructField("npi", ArrayType(StringType)),
            StructField("tin", StringType))))),
        StructField("provider_references", ArrayType(StructType(Array(
            StructField("provider_group_id", StringType),   
            StructField("provider_groups", ArrayType(StructType(Array(
                StructField("npi", ArrayType(StringType)),
                StructField("tin", StringType))))),
                StructField("location", StringType))))))))),
    StructField("bundled_codes", ArrayType(StructType(Array(
        StructField("billing_code_type", StringType),
        StructField("billing_code_type_version", StringType),
        StructField("billing_code", StringType),
        StructField("description", StringType)))), true),
    StructField("covered_services", ArrayType(StructType(Array(
        StructField("billing_code_type", StringType),
        StructField("billing_code_type_version", StringType),
        StructField("billing_code", StringType),
        StructField("description", StringType)))), true),
        ))), true ),
StructField("provider_references",ArrayType(StructType(Array(
        StructField("provider_group_id",StringType),
        StructField("provider_groups",ArrayType(StructType(Array(
            StructField("npi",ArrayType(StringType),true),
            StructField("tin",StringType)))),true),
            StructField("location",StringType)))),true),

StructField(“last_updated_on”,StringType),
StructField(“version”,StringType)))`
一个小的示例文件(上述标准)可以在这里找到:https://webtpa-public-access.s3.us-west-2.amazonaws.com/subfolder/2023-04_070_09B0_in-network-rates_01_of_122.json.gz

km0tfn4u

km0tfn4u1#

在链接的文档中,在表中,它说像plan_name这样的值是not必需的。因此,您需要添加第三个参数True以确保该字段的可空性。
对所有其他非必填字段重复相同的操作。或者确保将所有必填字段设置为False可为空。
或者,更好的方法-
似乎有一个JSON模式文件提供在
https://github.com/CMSgov/price-transparency-guide/blob/master/schemas/in-network-rates/in-network-rates.json
您可以使用DataType.fromJson(fileContentsAsString)来读取它。
Scala文档没有完整的示例,但PySpark有。
因此,您可以将其与from_json(e: Column, schema: DataType)函数或spark.read.schema(dataType).json("path")一起使用

相关问题