背景资料:我正在构建一个json解析器,它可以接受任何格式的json,并将其写入一个delta表,该表具有一个可以根据新数据/新列更新的模式。
我尝试用传入的json文件(源)更新一个已经存在的Databricks Delta表(目标),并在Databricks中执行MERGE INTO操作。
举个例子,假设现有的表看起来像这样:
目标(现有增量表)
| 嵌套|ID|费|
| --|--|--|
| [{“code”:null,“value”:“11”},{“code”:null,“value”:“16”}]| 47 |五十七|
| null| 48 |null|
| [{“code”:null,“value”:“14”},{“code”:null,“value”:“12”},{“code”:null,“value”:“14”}]| 51 |null|
源代码(传入JSON)
| 嵌套|ID|费|
| --|--|--|
| null| 47 |三十六点|
| [{“code”:null,“value”:“14”},{“code”:null,“value”:“12”},{“code”:null,“value”:“14”}]| 52 |十七点|
期望的结果(第一行被颠倒,嵌套值现在为null,ID 52被插入到末尾):
| 嵌套|ID|费|
| --|--|--|
| null| 47 |三十六点|
| null| 48 |null|
| [{“code”:null,“value”:“14”},{“code”:null,“value”:“12”},{“code”:null,“value”:“14”}]| 51 |null|
| [{“code”:null,“value”:“14”},{“code”:null,“value”:“12”},{“code”:null,“value”:“14”}]| 52 |十七点|
现在,我正在根据目标和源模式动态生成一个set字符串(它们在很多情况下有数百列长,这个解析器对于70+种不同的json模式是通用的).目标模式来自现有的delta表,而源模式是一个读入json源代码的对象。这个源模式可以包含更多以前不在目标中的列schema,我想将这些列添加到表中,这在设置spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled","true")
时有效。问题是null列不能自动转换为复杂的json结构-据我所知。我目前的MERGE INTO语句具有动态设置字符串:
MERGE INTO {existing_table_name} t
USING {new_source_json} s
ON t.ID = s.ID
WHEN MATCHED THEN
UPDATE SET {set_string}
WHEN NOT MATCHED
THEN INSERT {set_string_insert}
字符串
我得到以下错误:
Failed to merge incompatible data types ArrayType(StructType(StructField(code,StringType,true),StructField(value,StringType,true)),true) and StringType
型
这是因为目标表的嵌套列类型是ArrayType(StructType(StructField(code,StringType,true),StructField(value,StringType,true)),true)
,但是用于尝试和更新ID 47的null值在Spark中被转换为字符串。
我生成的set_strings看起来像这样:
t.nested = s.nested, t.ID = s.ID, t.Fee = s.Fee
型
我还尝试在MERGE INTO语句本身中转换列:
MERGE INTO {existing_table_name} t
USING {new_source_json} s
ON t.ID = s.ID
WHEN MATCHED THEN
UPDATE SET t.Nested = cast(s.Nested as array<struct<code:string,value:string>>),t.ID = s.ID, t.Fee = s.Fee
WHEN NOT MATCHED
THEN INSERT (Nested,ID,Fee)
VALUES(s.cast(s.nested as array<struct<code:string,value:string>>), s.ID, s.Fee)
型
这仍然会产生同样的错误。在进入目标之前,我是否必须转换源对象框架列?对于200多个列来说,使用withColumn函数逐个处理任何不匹配的类型是相当繁琐和昂贵的。
有没有一种方法可以使用MERGE INTO或任何其他操作在Databricks中实现所需的结果?我宁愿避免手动创建模式,除非这是唯一的前进路线。
1条答案
按热度按时间n7taea2i1#
最后,我绕过了merge语句,而是使用现有的tables模式加载到新的json文件中。(意味着列数据类型发生了变化,或者向JSON中添加了新列),然后,我必须启用模式演化,以确保捕获这个新列。上一个表(
passed_schema
变量):字符串
使用
passed_schema
沿着新的raw_json_df
,您可以执行merge语句。同样,如果存在不匹配,您可以手动添加新列,或者使用另一个函数来允许新列的模式演变。