如何在ADF中动态使用upsert到Azure SQL数据库

hts6caw3  于 2023-04-22  发布在  其他
关注(0)|答案(2)|浏览(161)

在我的管道中,我正在运行一个查找,从一个数据库中收集表,以便传输到另一个数据库。

select table_schema,table_name from information_schema.tables where table_schema = 'schema1' OR TABLE_SCHEMA = 'schema2'

然后,我对每个表执行一个复制活动。此时,我已将接收器选项设置为“insert”。然而,我想尝试使用upsert,但我必须确定“key columns(s)”以便这样做。尝试动态获取这些的最佳方法是什么?哪些列最适合用于此操作?哪些列将包含该行的唯一值?

hyrbngr7

hyrbngr71#

对于键列,需要使用唯一标识该表中的行的列或列组合。
您可以将这个键列表添加到上面的查询结果中,方法是从元数据中获取它们(如果您在该表上有主键或唯一索引),或者将它们保存在某个管理表中,然后查询该表。
然后你可以在ADF中使用动态内容来提供接收标签上的键列。请注意键列应该是数组类型(因为它是列的列表)。

zc0qhyus

zc0qhyus2#

为查找、源和目标创建数据库链接服务创建管道并执行查找活动,以使用以下查询选项获取源数据库中的表列表。

SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.TABLES
WHERE TABLE_TYPE = 'BASE TABLE' and TABLE_SCHEMA = 'dbo'

成功执行查找连接foreach活动后,启用顺序并将@activity('Lookup1').output.value作为动态内容添加到for each中,添加查找活动到foreach,并使用以下查询执行查询以获取表的键列

SELECT column_name
FROM information_schema.key_column_usage

在成功执行查找连接复制数据活动后,创建源sql数据集,其中创建了链接,并创建两个名为Schema和tableName的参数来检索数据库的表,并给出以下值:

schema: @item().TABLE_SCHEMA
tableName: @item().TABLE_NAME

创建接收数据集并创建上述参数并给予相同的值。启用upsert选项并为键列添加动态内容为@createArray(activity('Lookup2').output.firstRow.column_name)

调试它成功执行的管道并成功更新目标中的表。

Pipeline Json:

{
    "name": "Pipeline 2",
    "properties": {
        "activities": [
            {
                "name": "Lookup1",
                "type": "Lookup",
                "dependsOn": [],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "AzureSqlSource",
                        "sqlReaderQuery": "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.TABLES\nWHERE TABLE_TYPE = 'BASE TABLE' and TABLE_SCHEMA = 'dbo'",
                        "queryTimeout": "02:00:00",
                        "partitionOption": "None"
                    },
                    "dataset": {
                        "referenceName": "AzureSqlTable3",
                        "type": "DatasetReference"
                    },
                    "firstRowOnly": false
                }
            },
            {
                "name": "ForEach1",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "Lookup1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@activity('Lookup1').output.value",
                        "type": "Expression"
                    },
                    "isSequential": true,
                    "activities": [
                        {
                            "name": "Lookup2",
                            "type": "Lookup",
                            "dependsOn": [],
                            "policy": {
                                "timeout": "0.12:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "source": {
                                    "type": "AzureSqlSource",
                                    "sqlReaderQuery": "SELECT column_name\nFROM information_schema.key_column_usage",
                                    "queryTimeout": "02:00:00",
                                    "partitionOption": "None"
                                },
                                "dataset": {
                                    "referenceName": "AzureSqlTable3",
                                    "type": "DatasetReference"
                                }
                            }
                        },
                        {
                            "name": "Copy_3q5",
                            "type": "Copy",
                            "dependsOn": [
                                {
                                    "activity": "Lookup2",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "timeout": "0.12:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [
                                {
                                    "name": "Source",
                                    "value": "files//input.xml"
                                },
                                {
                                    "name": "Destination",
                                    "value": "dbo.input"
                                }
                            ],
                            "typeProperties": {
                                "source": {
                                    "type": "AzureSqlSource",
                                    "queryTimeout": "02:00:00",
                                    "partitionOption": "None"
                                },
                                "sink": {
                                    "type": "AzureSqlSink",
                                    "writeBehavior": "upsert",
                                    "upsertSettings": {
                                        "useTempDB": true,
                                        "keys": {
                                            "value": "@createArray(activity('Lookup2').output.firstRow.column_name)",
                                            "type": "Expression"
                                        }
                                    },
                                    "sqlWriterUseTableLock": false,
                                    "disableMetricsCollection": false
                                },
                                "enableStaging": false,
                                "translator": {
                                    "type": "TabularTranslator",
                                    "typeConversion": true,
                                    "typeConversionSettings": {
                                        "allowDataTruncation": true,
                                        "treatBooleanAsNumber": false
                                    }
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "DestinationDataset_3q5",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "tableName": {
                                            "value": "@item().TABLE_NAME",
                                            "type": "Expression"
                                        },
                                        "schema": {
                                            "value": "@item().TABLE_SCHEMA",
                                            "type": "Expression"
                                        }
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "AzureSqlTable2",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "schema": {
                                            "value": "@item().TABLE_SCHEMA",
                                            "type": "Expression"
                                        },
                                        "tableName": {
                                            "value": "@item().TABLE_NAME",
                                            "type": "Expression"
                                        }
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "variables": {
            "ok": {
                "type": "String"
            }
        },
        "annotations": [],
        "lastPublishTime": "2023-04-19T11:45:57Z"
    },
    "type": "Microsoft.Synapse/workspaces/pipelines"
}

相关问题