使用struct字段将qubole配置单元表同步到snowflake

xytpbqjk  于 2021-06-27  发布在  Hive
关注(0)|答案(1)|浏览(358)

我有一张table,像下面这张table:

use dm;

CREATE EXTERNAL TABLE IF NOT EXISTS fact (
    id string,
    fact_attr struct<
        attr1 : String,
        attr2 : String
    >
)
STORED AS PARQUET
LOCATION 's3://my-bucket/DM/fact'

我用雪花创建了如下的平行表:

CREATE TABLE IF NOT EXISTS dm.fact (
    id string,
    fact_attr variant
)

我的etl进程将数据加载到qubole表中,如下所示:

+------------+--------------------------------+
| id         | fact_attr                      |
+------------+--------------------------------+
| 1          | {"attr1": "a1", "attr2": "a2"} |
| 2          | {"attr1": "a3", "attr2": null} |
+------------+--------------------------------+

我正在尝试使用merge命令将此数据同步到snowflake,如

MERGE INTO DM.FACT dst USING %s src 
    ON dst.id = src.id
WHEN MATCHED THEN UPDATE SET
    fact_attr = parse_json(src.fact_attr)
WHEN NOT MATCHED THEN INSERT (
    id,
    fact_attr
) VALUES (
    src.id,
    parse_json(src.fact_attr)
);

我正在使用pyspark同步数据:

df.write \
  .option("sfWarehouse", sf_warehouse) \
  .option("sfDatabase", sf_database) \
  .option("sfSchema", sf_schema) \
  .option("postactions", query) \
  .mode("overwrite") \
  .snowflake("snowflake", sf_warehouse, sf_temp_table)

使用上述命令,我得到以下错误:

pyspark.sql.utils.IllegalArgumentException: u"Don't know how to save StructField(fact_attr,StructType(StructField(attr1,StringType,true), StructField(attr2,StringType,true)),true) of type attributes to Snowflake"

我阅读了以下链接,但没有成功:
半结构化数据类型
查询半结构化数据
问题:
如何从具有结构字段的qubole配置单元表插入/同步数据到snowflake?

9rnv2umw

9rnv2umw1#

尝试此操作时使用的用于snowflake的spark connector版本缺少对各种数据类型的支持。
在连接器版本2.4.4(2018年7月发布)中引入了支持,其中structtype字段现在自动Map到将与merge命令一起使用的变量数据类型。

相关问题