使用pyspark从json数据获取配置单元表

gorkyyrv  于 2021-06-24  发布在  Hive
关注(0)|答案(1)|浏览(318)

我想从json数据创建一个平面配置单元表,json数据来自另一个配置单元表(放在一列event\u数据中)。下面是json数据结构。我已经使用稍后的视图创建了配置单元表,但是现在我想使用pyspark和一些自定义项来创建配置单元表。

'{"callId":"0000000","journey":{"channel":{"out":"sssss@icloud.com","outbound":"EMAIL"}},"application":{"componentId":"23456","name":"dfgt-ghy-svc","applicationReferenceId":"SRFC98756RD"},"servicingDetail":{"offerAttributes":{"id":"ADLC0110000"},"offerCommunicationAttributes":{"id":"CFRGTV10098","status":"SUCCESS"}},"customerInfo":{"calledInAccount":"ERFCVDG9801"},"correlationId":"9845-sd76-sdfr87","fulfiller":{"id":"DEFC1009","category":"TST","entity":"colleague"},"platform":{"name":"v-generation","id":"37664859"}}'

我只需要4列需要提取哪些是 callId , correlationId , servicingDetail -offerAttributes-id , fulfiller-id 请帮我做同样的事。

gdx19jrr

gdx19jrr1#

首先,在配置单元中创建输出。
然后,使用get_json_object()函数从单列表中选择数据,并将该数据插入到最终的表中。请参阅get\u json\u object()。

INSERT INTO table OutputTable 
SELECT 
    get_json_object(event_data,'$.callId') as callId, 
    get_json_object(event_data,'$.correlationId') as correlationId,
    get_json_object(event_data,'$.servicingDetail.offerAttributes.id') as servicingDetail_offerAttributes_id,
    get_json_object(event_data,'$.fulfiller.id') as fulfiller_id,
FROM SingleColumnTable;

第一次编辑-Pypark解决方案

将单列表读入dataframe(假设df name是dfsinglecolumneddata),然后应用下面的逻辑来获取每一列的数据。最后选择所需的列。

from pyspark.sql import Row
from collections import OrderedDict

def convert_to_row(d: dict) -> Row:
    return Row(**OrderedDict(sorted(d.items())))

dfSingleColumnedData.rdd.map(convert_to_row).toDF()

相关问题