Azure数据流:将同一类型的多个JSON对象解析为SQL

8oomwypt  于 2023-05-29  发布在  其他
关注(0)|答案(1)|浏览(137)

我有一个像下面这样的JSON,其中“报告”包含多个具有不同名称的对象。我需要使用Azure数据工厂将每个对象转换为数据库的行。

{
"reports": {
    "Z1-B5(CR)-L5-M": {
        "location_id": 1580,
        "date": "2023-05-24",
        "h19": 0,
        "h20": 0,
        "h21": 0,
        "h22": 0,
        "h23": 0,
        "peak_hour": 10,
        "total": 6,
        "location_name": "Z1-B5(CR)-L5-M"
    },
    "Z2-B22(FC)-L1-M": {
        "location_id": 1589,
        "date": "2023-05-24",
        "h19": 0,
        "h20": 0,
        "h21": 0,
        "h22": 0,
        "h23": 0,
        "peak_hour": 14,
        "total": 212,
        "location_name": "Z2-B22(FC)-L1-F"
    }
}

}
预期输出:

lawou6xi

lawou6xi1#

  • 由于reports是一组对象,因此您必须使用Azure数据流和ADF活动的组合。我需要提取reports属性中的键,使用该属性创建一个对象数组,其中每个对象表示一行。
  • 我已经采取了一个示例JSON文件作为数据流的源(问题中给出的数据)。

  • 使用select转换并使用csv作为sink(没有更改,第一行作为header被选中),我已经写入了数据。以下是数据流JSON:
{
    "name": "dataflow1",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "Json1",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "DelimitedText1",
                        "type": "DatasetReference"
                    },
                    "name": "sink1"
                }
            ],
            "transformations": [
                {
                    "name": "select1"
                }
            ],
            "scriptLines": [
                "source(output(",
                "          reports as ({Z1-B5(CR)-L5-M} as (location_id as integer, date as string, h19 as integer, h20 as integer, h21 as integer, h22 as integer, h23 as integer, peak_hour as integer, total as integer, location_name as string), {Z2-B22(FC)-L1-M} as (location_id as integer, date as string, h19 as integer, h20 as integer, h21 as integer, h22 as integer, h23 as integer, peak_hour as integer, total as integer, location_name as string))",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     ignoreNoFilesFound: false,",
                "     documentForm: 'singleDocument') ~> source1",
                "source1 select(mapColumn(",
                "          each(reports,match(true()))",
                "     ),",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true) ~> select1",
                "select1 sink(allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     partitionFileNames:['op.csv'],",
                "     umask: 0022,",
                "     preCommands: [],",
                "     postCommands: [],",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     saveOrder: 1,",
                "     partitionBy('hash', 1)) ~> sink1"
            ]
        }
    }
}

  • 正如您所看到的,在转换之后,reports属性键被提取到第一行。
  • 现在,运行此数据流,使用查找(lookup1)对上述文件进行数据集配置,如下图所示:

  • 此查找结果将如下图所示:

  • 现在使用,作为分隔符拆分字符串,并使用生成的数组迭代每个循环。对原始源文件使用第二次查找(Lookup2)来提取数据。
  • 在中为每个Activity使用附加变量Activity以获取所需的对象,如下所示:
@activity('Lookup2').output.value[0].reports[item()]
  • 我使用了一个样本集变量来显示附加变量活动的结果。

  • 如果您的数据库是SQL服务器,则可以使用openjson在查询中使用上述变量数据。如果是任何其他格式,请将此变量写入新文件,并使用复制数据活动复制到数据库表中。以下是管道JSON供参考。
{
    "name": "pipeline1",
    "properties": {
        "activities": [
            {
                "name": "Data flow1",
                "type": "ExecuteDataFlow",
                "dependsOn": [],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "dataflow": {
                        "referenceName": "dataflow1",
                        "type": "DataFlowReference"
                    },
                    "compute": {
                        "coreCount": 8,
                        "computeType": "General"
                    },
                    "traceLevel": "None",
                    "cacheSinks": {
                        "firstRowOnly": false
                    }
                }
            },
            {
                "name": "Lookup1",
                "type": "Lookup",
                "dependsOn": [
                    {
                        "activity": "Data flow1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "DelimitedTextSource",
                        "storeSettings": {
                            "type": "AzureBlobFSReadSettings",
                            "recursive": true,
                            "enablePartitionDiscovery": false
                        },
                        "formatSettings": {
                            "type": "DelimitedTextReadSettings"
                        }
                    },
                    "dataset": {
                        "referenceName": "DelimitedText2",
                        "type": "DatasetReference"
                    }
                }
            },
            {
                "name": "ForEach1",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "Lookup1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    },
                    {
                        "activity": "Lookup2",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@split(activity('Lookup1').output.firstRow['Prop_0'],',')",
                        "type": "Expression"
                    },
                    "isSequential": true,
                    "activities": [
                        {
                            "name": "Append variable1",
                            "type": "AppendVariable",
                            "dependsOn": [],
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "final_data",
                                "value": {
                                    "value": "@activity('Lookup2').output.value[0].reports[item()]",
                                    "type": "Expression"
                                }
                            }
                        }
                    ]
                }
            },
            {
                "name": "Lookup2",
                "type": "Lookup",
                "dependsOn": [],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "JsonSource",
                        "storeSettings": {
                            "type": "AzureBlobFSReadSettings",
                            "recursive": true,
                            "enablePartitionDiscovery": false
                        },
                        "formatSettings": {
                            "type": "JsonReadSettings"
                        }
                    },
                    "dataset": {
                        "referenceName": "Json1",
                        "type": "DatasetReference"
                    },
                    "firstRowOnly": false
                }
            },
            {
                "name": "Set variable1",
                "type": "SetVariable",
                "dependsOn": [
                    {
                        "activity": "ForEach1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "variableName": "final",
                    "value": {
                        "value": "@variables('final_data')",
                        "type": "Expression"
                    }
                }
            }
        ],
        "variables": {
            "tp": {
                "type": "String"
            },
            "final_data": {
                "type": "Array"
            },
            "final": {
                "type": "Array"
            }
        },
        "annotations": []
    }
}

相关问题