我是emr/hdfs/hive/spark世界的新手。我有一个大型json文件的集合(〉50 GB每个文件),我试图加载,以便查询特定的键。有一个标准的布局的json文件,但不是所有的文件包含所有的键。我已经创建了一个自定义模式,包括整个标准,并创建了一个 Dataframe 使用此模式。当我试图读取/查询json文件只返回空值。我正在尝试使用hive/spark在EMR集群中POC加载文件。是否有更好的方法来处理TB的json文件?
为了简化问题,假设完整的标准是:reporting_entity_name字符串reporting_entity_type字符串版本字符串
文件A将有reporting_entity_name和version,但没有reporting_entity_type。文件B将有reporting_entity_name和reporting_entity_type,但没有version。
scala〉spark.sqlContext.sql(“SELECT reporting_entity_name,reporting_entity_type,version FROM vw“).show()
我需要文件A返回:
+-———————————————————-+-———————————————————-+-—————-+
|报告实体名称|报告实体类型|版本|
+-———————————————————-+-———————————————————-+-—————-+
| Mega Company Inc.|零 |1.3.1|
+-———————————————————-+-———————————————————-+-—————-+
文件B返回:
+-———————————————————-+-———————————————————-+-—————-+
|报告实体名称|报告实体类型|版本|
+-———————————————————-+-———————————————————-+-—————-+
| Mega Company Inc.|健康保险|零|
+-———————————————————-+-———————————————————-+-—————-+
但是从这两个文件返回的是:
+-———————————————————-+-———————————————————-+-—————-+
|报告实体名称|报告实体类型|版本|
+-———————————————————-+-———————————————————-+-—————-+
| 零 |零 |零|
+-———————————————————-+-———————————————————-+-—————-+
实际的标准json比上面的例子更复杂,可以在这里找到:https://github.com/CMSgov/price-transparency-guide/tree/master/schemas/in-network-rates#providers-object
下面是我的自定义模式:
`val schema = StructType(List(
StructField("reporting_entity_name", StringType),
StructField("reporting_entity_type", StringType),
StructField("plan_name", StringType),
StructField("plan_id_type", StringType),
StructField("plan_id", StringType),
StructField("plan_market_type", StringType),
StructField("in_network", ArrayType(StructType(List(
StructField("negotiation_arrangement", StringType),
StructField("plan_name", StringType),
StructField("billing_code_type", StringType),
StructField("billing_code_type_version", StringType),
StructField("billing_code", StringType),
StructField("description", StringType),
StructField("negotiated_rates", ArrayType(StructType(Array(
StructField("negotiated_prices", ArrayType(StructType(Array(
StructField("negotiated_type", StringType),
StructField("negotiated_rate", StringType),
StructField("expiration_date", StringType),
StructField("service_code", ArrayType(StringType)),
StructField("billing_class", StringType),
StructField("billing_code_modifier", ArrayType(StringType)),
StructField("additional_information", StringType)))), true),
StructField("provider_groups", ArrayType(StructType(Array(
StructField("npi", ArrayType(StringType)),
StructField("tin", StringType))))),
StructField("provider_references", ArrayType(StructType(Array(
StructField("provider_group_id", StringType),
StructField("provider_groups", ArrayType(StructType(Array(
StructField("npi", ArrayType(StringType)),
StructField("tin", StringType))))),
StructField("location", StringType))))))))),
StructField("bundled_codes", ArrayType(StructType(Array(
StructField("billing_code_type", StringType),
StructField("billing_code_type_version", StringType),
StructField("billing_code", StringType),
StructField("description", StringType)))), true),
StructField("covered_services", ArrayType(StructType(Array(
StructField("billing_code_type", StringType),
StructField("billing_code_type_version", StringType),
StructField("billing_code", StringType),
StructField("description", StringType)))), true),
))), true ),
StructField("provider_references",ArrayType(StructType(Array(
StructField("provider_group_id",StringType),
StructField("provider_groups",ArrayType(StructType(Array(
StructField("npi",ArrayType(StringType),true),
StructField("tin",StringType)))),true),
StructField("location",StringType)))),true),
StructField(“last_updated_on”,StringType),
StructField(“version”,StringType)))`
一个小的示例文件(上述标准)可以在这里找到:https://webtpa-public-access.s3.us-west-2.amazonaws.com/subfolder/2023-04_070_09B0_in-network-rates_01_of_122.json.gz
1条答案
按热度按时间km0tfn4u1#
在链接的文档中,在表中,它说像
plan_name
这样的值是not必需的。因此,您需要添加第三个参数True
以确保该字段的可空性。对所有其他非必填字段重复相同的操作。或者确保将所有必填字段设置为False可为空。
或者,更好的方法-
似乎有一个JSON模式文件提供在
https://github.com/CMSgov/price-transparency-guide/blob/master/schemas/in-network-rates/in-network-rates.json
您可以使用
DataType.fromJson(fileContentsAsString)
来读取它。Scala文档没有完整的示例,但PySpark有。
因此,您可以将其与
from_json(e: Column, schema: DataType)
函数或spark.read.schema(dataType).json("path")
一起使用