如何在Databricks(PySpark)中使用Merge Into,给定源有空值插入到定义的目标模式(delta表)中

jq6vz3qz  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(124)

背景资料:我正在构建一个json解析器,它可以接受任何格式的json,并将其写入一个delta表,该表具有一个可以根据新数据/新列更新的模式。
我尝试用传入的json文件(源)更新一个已经存在的Databricks Delta表(目标),并在Databricks中执行MERGE INTO操作。
举个例子,假设现有的表看起来像这样:
目标(现有增量表)
| 嵌套|ID|费|
| --|--|--|
| [{“code”:null,“value”:“11”},{“code”:null,“value”:“16”}]| 47 |五十七|
| null| 48 |null|
| [{“code”:null,“value”:“14”},{“code”:null,“value”:“12”},{“code”:null,“value”:“14”}]| 51 |null|
源代码(传入JSON)
| 嵌套|ID|费|
| --|--|--|
| null| 47 |三十六点|
| [{“code”:null,“value”:“14”},{“code”:null,“value”:“12”},{“code”:null,“value”:“14”}]| 52 |十七点|
期望的结果(第一行被颠倒,嵌套值现在为null,ID 52被插入到末尾):
| 嵌套|ID|费|
| --|--|--|
| null| 47 |三十六点|
| null| 48 |null|
| [{“code”:null,“value”:“14”},{“code”:null,“value”:“12”},{“code”:null,“value”:“14”}]| 51 |null|
| [{“code”:null,“value”:“14”},{“code”:null,“value”:“12”},{“code”:null,“value”:“14”}]| 52 |十七点|
现在,我正在根据目标和源模式动态生成一个set字符串(它们在很多情况下有数百列长,这个解析器对于70+种不同的json模式是通用的).目标模式来自现有的delta表,而源模式是一个读入json源代码的对象。这个源模式可以包含更多以前不在目标中的列schema,我想将这些列添加到表中,这在设置spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled","true")时有效。问题是null列不能自动转换为复杂的json结构-据我所知。我目前的MERGE INTO语句具有动态设置字符串:

  1. MERGE INTO {existing_table_name} t
  2. USING {new_source_json} s
  3. ON t.ID = s.ID
  4. WHEN MATCHED THEN
  5. UPDATE SET {set_string}
  6. WHEN NOT MATCHED
  7. THEN INSERT {set_string_insert}

字符串
我得到以下错误:

  1. Failed to merge incompatible data types ArrayType(StructType(StructField(code,StringType,true),StructField(value,StringType,true)),true) and StringType


这是因为目标表的嵌套列类型是ArrayType(StructType(StructField(code,StringType,true),StructField(value,StringType,true)),true),但是用于尝试和更新ID 47的null值在Spark中被转换为字符串。
我生成的set_strings看起来像这样:

  1. t.nested = s.nested, t.ID = s.ID, t.Fee = s.Fee


我还尝试在MERGE INTO语句本身中转换列:

  1. MERGE INTO {existing_table_name} t
  2. USING {new_source_json} s
  3. ON t.ID = s.ID
  4. WHEN MATCHED THEN
  5. UPDATE SET t.Nested = cast(s.Nested as array<struct<code:string,value:string>>),t.ID = s.ID, t.Fee = s.Fee
  6. WHEN NOT MATCHED
  7. THEN INSERT (Nested,ID,Fee)
  8. VALUES(s.cast(s.nested as array<struct<code:string,value:string>>), s.ID, s.Fee)


这仍然会产生同样的错误。在进入目标之前,我是否必须转换源对象框架列?对于200多个列来说,使用withColumn函数逐个处理任何不匹配的类型是相当繁琐和昂贵的。
有没有一种方法可以使用MERGE INTO或任何其他操作在Databricks中实现所需的结果?我宁愿避免手动创建模式,除非这是唯一的前进路线。

n7taea2i

n7taea2i1#

最后,我绕过了merge语句,而是使用现有的tables模式加载到新的json文件中。(意味着列数据类型发生了变化,或者向JSON中添加了新列),然后,我必须启用模式演化,以确保捕获这个新列。上一个表(passed_schema变量):

  1. old_table = spark.table(f"{full_table_name}")
  2. passed_schema = old_table.schema
  3. def read_from_blob_as_pyspark_df_json_with_schema(file_path, container_name, storage_account_name, schema):
  4. spark_df = spark.read.format('json')\
  5. .schema(schema)\
  6. .load(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{file_path}");
  7. return spark_df
  8. raw_json_df = read_from_blob_as_pyspark_df_json_with_schema(f, container_name, storage_account_name, passed_schema)

字符串
使用passed_schema沿着新的raw_json_df,您可以执行merge语句。同样,如果存在不匹配,您可以手动添加新列,或者使用另一个函数来允许新列的模式演变。

相关问题