unnest复杂pyspark模式

a9wyjsp7  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(390)

我需要帮助在一个特定的格式卸载Dataframe。
数据有点复杂,如下所示-

[{
    "ItemType": "CONSTRUCTION",
    "ItemId": "9169-bd62eac18e73",
    "Content": {
        "MetadataSetList": [
            {
                "SetId": "privacy-metadata-set",
                "MetadataList": [
                    {
                        "MetadataValue": "true",
                        "MetadataId": "Public"
                    }
                ]
            },
            {
                "SetId": "asset-metadata-set",
                "MetadataList": [
                    {
                        "MetadataValue": "new upload & edit test",
                        "MetadataId": "title"
                    },
                    {
                        "MetadataValue": "someone",
                        "MetadataId": "uploader"
                    },
                    {
                        "MetadataValue": "One,Five",
                        "MetadataId": "Families"
                    },
                    {
                        "MetadataValue": "@xyzzzzz",
                        "MetadataId": "creator"
                    }
                ]
            }
        ],
        "MetadataType": "UNDER CONSTRUCTION",
        "Tenant": "8ef4-0e976f342606"
    },
    "Version":"1.0",
    "IsActive":False,
    "Status":"DEPRECATED"
}]

我的要求是修改上面的记录,使得“content”中的数据不被赋值,其中“metadataid”的值是新键,“metadatavalue”的值是新键的值--

[
        {
            "status": "DEPRECATED",
            "version": "1.0",
            "item_type": "CONSTRUCTION",
            "item_id": "9169-bd62eac18e73",
            "is_active":False,
            "content": {
                "Public": "true", 
                "title": "new upload & edit test",
                "uploader": "someone",
                "Families": "One,Five",
                "creator": "@xyzzzzz"
            },
            "metadata_type": "UNDER CONSTRUCTION",
            "tenant": "8ef4-0e976f342606"
        }
    ]

我有下面的代码来转换python脚本中的数据-


# Data Transformation Function

def transform_data(docs):
    """

    """
    unnested_table_items = []

    for doc in docs:
        NewDoc = {}

        NewDoc['status'] = doc['Status']
        NewDoc['version'] = doc['Version']
        NewDoc['item_type'] = doc['ItemType']
        NewDoc['item_id'] = doc['ItemId']
        NewDoc['is_active'] = doc['IsActive']
        content_dict = {}
        for row in doc['Content']['MetadataSetList']:
            for _ in row['MetadataList']:
                k = _['MetadataId']
                v = _['MetadataValue']
                content_dict.update({k: v})

        NewDoc['content'] = content_dict
        NewDoc['metadata_type'] = doc['Content']['MetadataType']
        NewDoc['tenant'] = doc['Content']['Tenant']

        unnested_table_items.append(NewDoc)

    return unnested_table_items

现在存储在s3中的数据需要使用aws胶水。当我创建从s3读取数据的dynamicframe时,模式结构是

root
|-- ItemType: string
|-- ItemId: string
|-- Content: struct
|    |-- MetadataSetList: array
|    |    |-- element: struct
|    |    |    |-- SetId: string
|    |    |    |-- MetadataList: array
|    |    |    |    |-- element: struct
|    |    |    |    |    |-- MetadataValue: string
|    |    |    |    |    |-- MetadataId: string
|    |-- MetadataType: string
|    |-- Tenant: string
|-- Version: string
|-- IsActive: string
|-- Status: string
+------------+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+--------+----------+
|ItemType    |ItemId           |Content                                                                                                                                                                                                     |Version|IsActive|Status    |
+------------+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+--------+----------+
|CONSTRUCTION|9169-bd62eac18e73|[[[privacy-metadata-set, [[true, Public]]], [asset-metadata-set, [[new upload & edit test, title], [someone, uploader], [One,Five, Families], [@xyzzzzz, creator]]]], UNDER CONSTRUCTION, 8ef4-0e976f342606]|1.0    |False   |DEPRECATED|
+------------+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+--------+----------+

使用这个模式结构,我如何转换它以获得python函数转换后显示的数据。
如有任何帮助,我们将不胜感激。谢谢您。

2nc8po8w

2nc8po8w1#

谢谢你的帮助。我最终为转换创建了自定义项。

def content_transform(x):

            content_dict = {}
            for row in x:
                for _ in row['MetadataList']:
                    k = _['MetadataId']
                    v = _['MetadataValue']
                    content_dict.update({k: v})

            return str(content_dict)

        def content_udf():
            return functions.udf(content_transform, types.StringType())

        df2 = (df1
               .withColumn("ContentUnnested", content_udf()
                           (functions.col("Content.MetadataSetList"))))

在将其转换为stringtype之后,我在json中使用了从aws glue到unbox的unbox类。

xzlaal3s

xzlaal3s2#

试图用scala解决,但是 functions 在pyspark api中使用-

1. 加载数据

val data =
      """
        |[{
        |    "ItemType": "CONSTRUCTION",
        |    "ItemId": "9169-bd62eac18e73",
        |    "Content": {
        |        "MetadataSetList": [
        |            {
        |                "SetId": "privacy-metadata-set",
        |                "MetadataList": [
        |                    {
        |                        "MetadataValue": "true",
        |                        "MetadataId": "Public"
        |                    }
        |                ]
        |            },
        |            {
        |                "SetId": "asset-metadata-set",
        |                "MetadataList": [
        |                    {
        |                        "MetadataValue": "new upload & edit test",
        |                        "MetadataId": "title"
        |                    },
        |                    {
        |                        "MetadataValue": "someone",
        |                        "MetadataId": "uploader"
        |                    },
        |                    {
        |                        "MetadataValue": "One,Five",
        |                        "MetadataId": "Families"
        |                    },
        |                    {
        |                        "MetadataValue": "@xyzzzzz",
        |                        "MetadataId": "creator"
        |                    }
        |                ]
        |            }
        |        ],
        |        "MetadataType": "UNDER CONSTRUCTION",
        |        "Tenant": "8ef4-0e976f342606"
        |    },
        |    "Version":"1.0",
        |    "IsActive":false,
        |    "Status":"DEPRECATED"
        |}]
      """.stripMargin

    val df = spark.read
      .option("multiline", true)
      .json(Seq(data).toDS())
    df.show(false)
    df.printSchema()

    /**
      * +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+-----------------+------------+----------+-------+
      * |Content                                                                                                                                                                                                     |IsActive|ItemId           |ItemType    |Status    |Version|
      * +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+-----------------+------------+----------+-------+
      * |[[[[[Public, true]], privacy-metadata-set], [[[title, new upload & edit test], [uploader, someone], [Families, One,Five], [creator, @xyzzzzz]], asset-metadata-set]], UNDER CONSTRUCTION, 8ef4-0e976f342606]|false   |9169-bd62eac18e73|CONSTRUCTION|DEPRECATED|1.0    |
      * +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+-----------------+------------+----------+-------+
      *
      * root
      * |-- Content: struct (nullable = true)
      * |    |-- MetadataSetList: array (nullable = true)
      * |    |    |-- element: struct (containsNull = true)
      * |    |    |    |-- MetadataList: array (nullable = true)
      * |    |    |    |    |-- element: struct (containsNull = true)
      * |    |    |    |    |    |-- MetadataId: string (nullable = true)
      * |    |    |    |    |    |-- MetadataValue: string (nullable = true)
      * |    |    |    |-- SetId: string (nullable = true)
      * |    |-- MetadataType: string (nullable = true)
      * |    |-- Tenant: string (nullable = true)
      * |-- IsActive: boolean (nullable = true)
      * |-- ItemId: string (nullable = true)
      * |-- ItemType: string (nullable = true)
      * |-- Status: string (nullable = true)
      * |-- Version: string (nullable = true)
      */

2. 处理数据

val mergeMap = udf((arr: mutable.WrappedArray[Map[String, String]]) => {
        val res = mutable.HashMap.empty[String, String]
        arr.foldLeft(res){case (map, next) => next.++:(map)(collection.breakOut)}
      })

   val processedDF = df.select(col("IsActive").as("is_active"),
      col("ItemId").as("item_id"),
      col("ItemType").as("item_type"),
      col("Status").as("status"),
      col("Version").as("version"),
      col("Content.MetadataType").as("metadata_type"),
      col("Content.Tenant").as("tenant"),
      col("Content.MetadataSetList").getItem(0).getField("MetadataList").as("content1"),
      col("Content.MetadataSetList").getItem(1).getField("MetadataList").as("content2")
    ).withColumn("content",
      array_union(
        col("content1"),
        col("content2")
      )
    )
      .withColumn("content", expr("TRANSFORM(content, x -> map(x.MetadataId, x.MetadataValue))"))
     .withColumn("content", mergeMap(col("content")))
      .drop("content1", "content2")

    processedDF.show(false)
    processedDF.printSchema()

    /**
      * +---------+-----------------+------------+----------+-------+------------------+-----------------+-----------------------------------------------------------------------------------------------------------------+
      * |is_active|item_id          |item_type   |status    |version|metadata_type     |tenant           |content                                                                                                          |
      * +---------+-----------------+------------+----------+-------+------------------+-----------------+-----------------------------------------------------------------------------------------------------------------+
      * |false    |9169-bd62eac18e73|CONSTRUCTION|DEPRECATED|1.0    |UNDER CONSTRUCTION|8ef4-0e976f342606|[Families -> One,Five, Public -> true, creator -> @xyzzzzz, title -> new upload & edit test, uploader -> someone]|
      * +---------+-----------------+------------+----------+-------+------------------+-----------------+-----------------------------------------------------------------------------------------------------------------+
      *
      * root
      * |-- is_active: boolean (nullable = true)
      * |-- item_id: string (nullable = true)
      * |-- item_type: string (nullable = true)
      * |-- status: string (nullable = true)
      * |-- version: string (nullable = true)
      * |-- metadata_type: string (nullable = true)
      * |-- tenant: string (nullable = true)
      * |-- content: map (nullable = true)
      * |    |-- key: string
      * |    |-- value: string (valueContainsNull = true)
      */

3. 将dataframe转换为json

processedDF.toJSON
      .show(false)

//    {
//      "is_active": false,
//      "item_id": "9169-bd62eac18e73",
//      "item_type": "CONSTRUCTION",
//      "status": "DEPRECATED",
//      "version": "1.0",
//      "metadata_type": "UNDER CONSTRUCTION",
//      "tenant": "8ef4-0e976f342606",
//      "content": {
//        "Public": "true",
//        "Families": "One,Five",
//        "creator": "@xyzzzzz",
//        "uploader": "someone",
//        "title": "new upload & edit test"
//      }
//    }

相关问题