azure 动态执行DLT管道以在Databricks中创建SCD 2

thigvfpy  于 2023-10-22  发布在  其他
关注(0)|答案(1)|浏览(98)

我试图通过Databricks DLT管道加载我的datalake SCD 2表。我的目标是通过在DLT管道中的“配置”设置中传递所需的信息来动态加载notebook中的表。
“配置”设置可能看起来像这样:

{
    "object1": {
        "object_name": "table",
        "business_key_column": "table_bk",
        "except_column_list": "change_date",
        "sequence_column": "change_date"
    },
    "object2"...
}

稍后,目标笔记本应该获取管道配置中的参数,并相应地加载表。我可以在目标笔记本中获取管道配置设置吗?如果是:如何进行?

yquaqz18

yquaqz181#

可以。您可以使用spark configuration在notebook中获取管道配置设置。
如本文档所述,通过spark配置,可以在管道查询中使用配置设置。

您可以使用下面的语句spark.conf.get("key")访问。
下面是给出的配置。

并在笔记本中访问。

import dlt

@dlt.table(
comment="The raw wikipedia clickstream dataset, ingested from /databricks-datasets."
)
def clickstream_raw():
    json_path = spark.conf.get("json_path")
    return (spark.read.format("json").load(json_path))

根据给定的路径,返回该嵌套框架。
在你的情况下,你可以像下面这样得到它。

import dlt

@dlt.table()
def object_list():
    import json
    
    json_obj = json.loads(spark.conf.get("object_list"))
    data = [json.dumps(json_obj[i]) for i in json_obj]
    json_df = spark.read.json(spark.sparkContext.parallelize(data))
    return json_df

获取json对象后,您可以相应地使用它。
输出:

相关问题