多态json的spark处理

w1e3prcc  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(469)

考虑以下json输入:

  1. {
  2. "common": { "type":"A", "date":"2020-01-01T12:00:00" },
  3. "data": {
  4. "name":"Dave",
  5. "pets": [ "dog", "cat" ]
  6. }
  7. }
  8. {
  9. "common": { "type": "B", "date":"2020-01-01T12:00:00" },
  10. "data": {
  11. "whatever": { "X": {"foo":3}, "Y":"bar" },
  12. "favoriteInts": [ 0, 1, 7]
  13. }
  14. }

我熟悉 json-schema 我可以这样形容 data 子结构可以是 name,petswhatever,favoriteInts . 我们使用 common.type 用于标识类型的字段。
这在spark模式定义中可能吗?初步试验的思路如下:

  1. schema = StructType([
  2. StructField("common", StructType(common_schema)), # .. because the type is consistent
  3. StructField("data", StructType()) # attempting to declare a "generic" struct
  4. ])
  5. df = spark.read.option("multiline", "true").json(source, schema)

不起作用;一读到 data struct包含,嗯,任何东西,但在这个特殊的例子中2个字段,我们得到:

  1. +--------------------+----+
  2. | common|data|
  3. +--------------------+----+
  4. |{2020-01-01T12:00...| {}|
  5. +--------------------+----+

并尝试提取任何指定字段 No such struct field <whatever> . 将“generic struct”从 schema def完全生成一个没有任何字段名的Dataframe data ,别管里面的田地。
除此之外,我最终会尝试这样做:

  1. df = spark.read.json(source)
  2. def processA(frame):
  3. frame.select( frame.data.name ) # we KNOW name exists for type A
  4. ...
  5. def processB(frame):
  6. frame.select( frame.data.favoriteInts ) # we KNOW favoriteInts exists for type B
  7. ...
  8. processA( df.filter(df.common.type == "A") )
  9. processB( df.filter(df.common.type == "B") )
tkclm6bt

tkclm6bt1#

您可以使用嵌套的和可为空的类型(通过指定 True )以适应不确定性。

  1. from pyspark.sql.types import StructType, StringType, ArrayType, StructField, IntegerType
  2. data_schema = StructType([
  3. # Type A related attributes
  4. StructField("name",StringType(),True), # True implies nullable
  5. StructField("pets",ArrayType(StringType()),True),
  6. # Type B related attributes
  7. StructField("whatever",StructType([
  8. StructField("X",StructType([
  9. StructField("foo",IntegerType(),True)
  10. ]),True),
  11. StructField("Y",StringType(),True)
  12. ]),True), # True implies nullable
  13. StructField("favoriteInts",ArrayType(IntegerType()),True),
  14. ])
  15. schema = StructType([
  16. StructField("common", StructType(common_schema)), # .. because the type is consistent
  17. StructField("data", data_schema)
  18. ])
展开查看全部

相关问题