aws glue-dynamicframe在json文件中具有不同的模式

qlckcl4x  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(550)

示例:我在glue目录中有一个带有ddl的分区表:

  1. CREATE EXTERNAL TABLE `test`(
  2. `id` int,
  3. `data` struct<a:string,b:string>)
  4. PARTITIONED BY (
  5. `partition_0` string)
  6. ROW FORMAT SERDE
  7. 'org.openx.data.jsonserde.JsonSerDe'

s3中的底层数据是json文件,具有不同的模式,这意味着某些元素可能不存在于某些文件中,而存在于其他文件中。
在这个示例中,分区\u 0='01'包含包含所有元素的json文件:

  1. {"id": 1,"data": {"a": "value-a", "b": "value-b"}}

分区\u 0='02'中的文件不包含元素数据。b:

  1. {"id": 1,"data": {"a": "value-a"}}

问题:当我在glue中创建dynamicframe(我使用python)时,它的模式取决于我查询的数据。如果我包含来自分区\u 0='01'的数据,那么所有元素都存在于架构中。

  1. id_partition_predicate="partition_0 = '01'"
  2. print("partition with 'b'")
  3. glueContext.create_dynamic_frame.from_catalog(database = glue_source_database, table_name = "test", push_down_predicate = id_partition_predicate).printSchema()
  4. partition with 'b'
  5. root
  6. |-- id: int
  7. |-- data: struct
  8. | |-- a: string
  9. | |-- b: string
  10. |-- partition_0: string
  1. print("both partitions")
  2. glueContext.create_dynamic_frame.from_catalog(database = glue_source_database, table_name = "test").printSchema()
  3. both partitions
  4. root
  5. |-- id: int
  6. |-- data: struct
  7. | |-- a: string
  8. | |-- b: string
  9. |-- partition_0: string

如果我只查询分区\u 0='02'中的数据,那么元素data.b不存在于dynamicframe架构中,即使它存在于表定义中。

  1. print("partition without 'b'")
  2. id_partition_predicate="partition_0 = '02'"
  3. glueContext.create_dynamic_frame.from_catalog(database = glue_source_database, table_name = "test", push_down_predicate = id_partition_predicate).printSchema()
  4. partition without 'b'
  5. root
  6. |-- id: int
  7. |-- data: struct
  8. | |-- a: string
  9. |-- partition_0: string

问题:如何创建dynamicframe或dataframe来始终包含粘合表模式中的所有元素?
提前谢谢!

sxpgvts3

sxpgvts31#

想出了这个解决方案:

  1. id_partition_predicate="partition_0 = '02'"
  2. dyf = glueContext.create_dynamic_frame.from_catalog(database = glue_source_database, table_name = "test", push_down_predicate = id_partition_predicate)
  3. dyf.printSchema()
  4. df=dyf.toDF()
  5. try:
  6. df = df.withColumn("b", col("data").getItem("b"))
  7. except:
  8. df = df.withColumn("b", lit(None).cast(StringType()))
  9. df.show()

输出:

  1. root
  2. |-- id: int
  3. |-- data: struct
  4. | |-- a: string
  5. |-- partition_0: string
  6. +---+---------+-----------+----+
  7. | id| data|partition_0| b|
  8. +---+---------+-----------+----+
  9. | 1|[value-a]| 02|null|
  10. +---+---------+-----------+----+
展开查看全部

相关问题