在azuredatabricks中使用scala拆分包含嵌套数组的数组

uwopmtnx  于 2021-07-12  发布在  Spark
关注(0)|答案(2)|浏览(322)

我目前正在从事一个项目,在这个项目中,我必须从json文档(LogAnalytics rest api调用的输出)中提取一些可怕的嵌套数据,文档结构示例如下(我有更多的列):

{
"tables": [
    {
        "name": "PrimaryResult",
        "columns": [
            {
                "name": "Category",
                "type": "string"
            },
            {
                "name": "count_",
                "type": "long"
            }
        ],
        "rows": [
            [
                "Administrative",
                20839
            ],
            [
                "Recommendation",
                122
            ],
            [
                "Alert",
                64
            ],
            [
                "ServiceHealth",
                11
            ]
        ]
    }
] }

我已经成功地将这个json文档提取到一个Dataframe中,但是我很难理解接下来该怎么做。
我试图实现的目标是如下输出:

[{
"Category": "Administrative",
"count_": 20839
},
{
    "Category": "Recommendation",
    "count_": 122
},
{
    "Category": "Alert",
    "count_": 64
},
{
    "Category": "ServiceHealth",
    "count_": 11
}]

理想情况下,我希望使用columns数组作为每条记录的标题。然后我要将每个记录数组从父行数组中拆分为它自己的记录。
到目前为止,我已经尝试过展平我的原始导入Dataframe,但这不起作用,因为行数据是一个数组。
我该怎么解决这个难题呢?

bqucvtff

bqucvtff1#

处理这件事有点麻烦,但有个办法:

val df = spark.read.option("multiline",true).json("filepath")

val result = df.select(explode($"tables").as("tables"))
    .select($"tables.columns".as("col"), explode($"tables.rows").as("row"))
    .selectExpr("inline(arrays_zip(col, row))")
    .groupBy()
    .pivot($"col.name")
    .agg(collect_list($"row"))
    .selectExpr("inline(arrays_zip(Category, count_))")

result.show
+--------------+------+
|      Category|count_|
+--------------+------+
|Administrative| 20839|
|Recommendation|   122|
|         Alert|    64|
| ServiceHealth|    11|
+--------------+------+

要获得json输出,您可以

val result_json = result.agg(to_json(collect_list(struct("Category", "count_"))).as("json"))

result_json.show(false)
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|json                                                                                                                                                                       |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{"Category":"Administrative","count_":"20839"},{"Category":"Recommendation","count_":"122"},{"Category":"Alert","count_":"64"},{"Category":"ServiceHealth","count_":"11"}]|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

或者可以另存为json,例如 result.save.json("output") .

mcdcgff0

mcdcgff02#

另一种使用 transform 功能:

import org.apache.spark.sql.functions._

val df = spark.read.option("multiline",true).json(inPath)

val df1 = df.withColumn("tables", explode($"tables"))
            .select($"tables.rows".as("rows"))
            .select(expr("inline(transform(rows, x -> struct(x[0] as Category, x[1] as _count)))"))

df1.show
//+--------------+------+
//|      Category|_count|
//+--------------+------+
//|Administrative| 20839|
//|Recommendation|   122|
//|         Alert|    64|
//| ServiceHealth|    11|
//+--------------+------+

然后保存到json文件:

df1.save.json(outPath)

相关问题