pyspark分解json字符串

xt0899hw  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(495)

输入Dataframe

id  name     collection
111 aaaaa    {"1":{"city":"city_1","state":"state_1","country":"country_1"},
              "2":{"city":"city_2","state":"state_2","country":"country_2"},
              "3":{"city":"city_3","state":"state_3","country":"country_3"}
             }
222 bbbbb    {"1":{"city":"city_1","state":"state_1","country":"country_1"},
              "2":{"city":"city_2","state":"state_2","country":"country_2"},
              "3":{"city":"city_3","state":"state_3","country":"country_3"}
              }

在这里

id ==> string
name ==> string
collection ==> string (string representation of JSON_data)

我想要这样的东西
输出Dataframe

id  name   key  value
111 aaaaa  "1"  {"city":"city_1","state":"state_1","country":"country_1"},
111 aaaaa  "2"  {"city":"city_2","state":"state_2","country":"country_2"},
111 aaaaa  "3"  {"city":"city_3","state":"state_3","country":"country_3"}             
222 bbbbb  "1"  {"city":"city_1","state":"state_1","country":"country_1"},
222 bbbbb  "2"  {"city":"city_2","state":"state_2","country":"country_2"},
222 bbbbb  "3"  {"city":"city_3","state":"state_3","country":"country_3"}

如果我的 collection 属性类型为 map 或者 array 那么 explode 函数将完成我的任务。但我有 collection 作为字符串类型(json\U数据)
如何获得输出Dataframe?
请告诉我
注意集合属性可能具有嵌套的和不可预测的架构。

{
  "1":{"city":"city_1","state":"state_1","country":"country_1"},          
  "2":{"city":"city_2","state":"state_2","country":"country_2","a":  
       {"aa":"111"}},
  "3":{"city":"city_3","state":"state_3"}
             }
y53ybaqx

y53ybaqx1#

你有这个功能 from_json 那就行了。它将转换您的字符串,然后您可以使用explode。

lb3vh1jj

lb3vh1jj2#

给出json模式并获取列的值,然后我从json生成struct column。

import pyspark.sql.functions as f
from pyspark.sql.types import *

schema = StructType([
    StructField('1', StructType([
        StructField('city', StringType(), True),
        StructField('state', StringType(), True),
        StructField('country', StringType(), True),
    ]), True),
    StructField('2', StructType([
        StructField('city', StringType(), True),
        StructField('state', StringType(), True),
        StructField('country', StringType(), True),
    ]), True),
    StructField('3', StructType([
        StructField('city', StringType(), True),
        StructField('state', StringType(), True),
        StructField('country', StringType(), True),
    ]), True),
])

df2 = df.withColumn('collection', f.from_json('collection', schema))
cols = df2.select('collection.*').columns

df2.withColumn('collection', f.arrays_zip(f.array(*map(lambda x: f.lit(x), cols)), f.array('collection.*'))) \
   .withColumn('collection', f.explode('collection')) \
   .withColumn('key', f.col('collection.0')) \
   .withColumn('value', f.col('collection.1')) \
   .drop('collection').show(10, False)

+---+-----+---+----------------------------+
|id |name |key|value                       |
+---+-----+---+----------------------------+
|111|aaaaa|1  |[city_1, state_1, country_1]|
|111|aaaaa|2  |[city_2, state_2, country_2]|
|111|aaaaa|3  |[city_3, state_3, country_3]|
|222|bbbbb|1  |[city_1, state_1, country_1]|
|222|bbbbb|2  |[city_2, state_2, country_2]|
|222|bbbbb|3  |[city_3, state_3, country_3]|
+---+-----+---+----------------------------+
xj3cbfub

xj3cbfub3#

这里有一个黑客解决方案(不理想,因为它使用底层 RDD )但我已经在模式不一致的场景中进行了测试,它看起来很健壮:

from pyspark.sql import Row

rdd1 = df.rdd

rdd1.map(lambda x: [(key, val) if key != 'collection' else (key, eval(val))
               for key, val in x.asDict().items()])
    .map(lambda x: Row(**dict(x)))
    .toDF().show()

相关问题