我需要帮助在一个特定的格式卸载Dataframe。
数据有点复杂,如下所示-
[{
"ItemType": "CONSTRUCTION",
"ItemId": "9169-bd62eac18e73",
"Content": {
"MetadataSetList": [
{
"SetId": "privacy-metadata-set",
"MetadataList": [
{
"MetadataValue": "true",
"MetadataId": "Public"
}
]
},
{
"SetId": "asset-metadata-set",
"MetadataList": [
{
"MetadataValue": "new upload & edit test",
"MetadataId": "title"
},
{
"MetadataValue": "someone",
"MetadataId": "uploader"
},
{
"MetadataValue": "One,Five",
"MetadataId": "Families"
},
{
"MetadataValue": "@xyzzzzz",
"MetadataId": "creator"
}
]
}
],
"MetadataType": "UNDER CONSTRUCTION",
"Tenant": "8ef4-0e976f342606"
},
"Version":"1.0",
"IsActive":False,
"Status":"DEPRECATED"
}]
我的要求是修改上面的记录,使得“content”中的数据不被赋值,其中“metadataid”的值是新键,“metadatavalue”的值是新键的值--
[
{
"status": "DEPRECATED",
"version": "1.0",
"item_type": "CONSTRUCTION",
"item_id": "9169-bd62eac18e73",
"is_active":False,
"content": {
"Public": "true",
"title": "new upload & edit test",
"uploader": "someone",
"Families": "One,Five",
"creator": "@xyzzzzz"
},
"metadata_type": "UNDER CONSTRUCTION",
"tenant": "8ef4-0e976f342606"
}
]
我有下面的代码来转换python脚本中的数据-
# Data Transformation Function
def transform_data(docs):
"""
"""
unnested_table_items = []
for doc in docs:
NewDoc = {}
NewDoc['status'] = doc['Status']
NewDoc['version'] = doc['Version']
NewDoc['item_type'] = doc['ItemType']
NewDoc['item_id'] = doc['ItemId']
NewDoc['is_active'] = doc['IsActive']
content_dict = {}
for row in doc['Content']['MetadataSetList']:
for _ in row['MetadataList']:
k = _['MetadataId']
v = _['MetadataValue']
content_dict.update({k: v})
NewDoc['content'] = content_dict
NewDoc['metadata_type'] = doc['Content']['MetadataType']
NewDoc['tenant'] = doc['Content']['Tenant']
unnested_table_items.append(NewDoc)
return unnested_table_items
现在存储在s3中的数据需要使用aws胶水。当我创建从s3读取数据的dynamicframe时,模式结构是
root
|-- ItemType: string
|-- ItemId: string
|-- Content: struct
| |-- MetadataSetList: array
| | |-- element: struct
| | | |-- SetId: string
| | | |-- MetadataList: array
| | | | |-- element: struct
| | | | | |-- MetadataValue: string
| | | | | |-- MetadataId: string
| |-- MetadataType: string
| |-- Tenant: string
|-- Version: string
|-- IsActive: string
|-- Status: string
+------------+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+--------+----------+
|ItemType |ItemId |Content |Version|IsActive|Status |
+------------+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+--------+----------+
|CONSTRUCTION|9169-bd62eac18e73|[[[privacy-metadata-set, [[true, Public]]], [asset-metadata-set, [[new upload & edit test, title], [someone, uploader], [One,Five, Families], [@xyzzzzz, creator]]]], UNDER CONSTRUCTION, 8ef4-0e976f342606]|1.0 |False |DEPRECATED|
+------------+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+--------+----------+
使用这个模式结构,我如何转换它以获得python函数转换后显示的数据。
如有任何帮助,我们将不胜感激。谢谢您。
2条答案
按热度按时间2nc8po8w1#
谢谢你的帮助。我最终为转换创建了自定义项。
在将其转换为stringtype之后,我在json中使用了从aws glue到unbox的unbox类。
xzlaal3s2#
试图用scala解决,但是
functions
在pyspark api中使用-1. 加载数据
2. 处理数据
3. 将dataframe转换为json