pyspark:将json扩展到新列

qzwqbdag  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(497)

我有以下数据:

+-----------+-----------+-------------------------------------------------------------+
|         id|      point|                         data                                |
+-------------------------------------------------------------------------------------+
|        dfb|          6|[{"key1":"124", "key2": "345"},{"key3":"324", "key1":"wfe"}] |
|        bgd|          7|[{"key3":"324", "key1":"wfe"},{"key1":"777", "key2":"888"}]  |
|        34d|          6|[{"key1":"111", "key4": "788", "key2":"dfef}]                |

我想把它转换成

+-----------+-----------+-----------------------------------------------+
|         id|      point|                         key1                  |
+-----------------------------------------------------------------------+
|        dfb|          6|    124                                        |     
|        bgd|          7|    777                                        |
|        34d|          6|    111                                        |

存在一个json列表,它们可能共享公共键,但我想从同样有key2的json中提取key1的值。
这在python中很容易实现。
在pyspark中,我看到了基于固定模式的解决方案(如何在pyspark中将列表拆分为多列?),但是在没有固定模式的情况下,我如何实现这一点呢。

e0uiprwp

e0uiprwp1#

检查以下代码。

from pyspark.sql import functions as F
from pyspark.sql.types import *
df.show()
+---+-----+---------------------------------------------------------+
|id |point|data                                                     |
+---+-----+---------------------------------------------------------+
|dfb|6    |[{"key1":"124","key2":"345"},{"key3":"324","key1":"wfe"}]|
|bgd|7    |[{"key3":"324","key1":"wfe"},{"key1":"777","key2":"888"}]|
|34d|6    |[{"key1":"111","key4":"788","key2":"dfef"}]              |
+---+-----+---------------------------------------------------------+
schema = ArrayType(MapType(StringType(),StringType()))
df.withColumn("data",F.explode(F.from_json(F.col("data"),schema))).withColumn("data",F.when(F.col("data")["key1"].cast("long").isNotNull(),F.col("data")["key1"])).filter(F.col("data").isNotNull()).show()

+---+-----+----+
| id|point|data|
+---+-----+----+
|dfb|    6| 124|
|bgd|    7| 777|
|34d|    6| 111|
+---+-----+----+
qyuhtwio

qyuhtwio2#

另一种方法是 higher_order_functions (Spark2.4+)涉及 filtertransform 可以是:

import pyspark.sql.functions as F

schema = ArrayType(MapType(StringType(),StringType()))

(df.withColumn("data",F.from_json(F.col("data"),schema))
   .withColumn("Key1",F.expr('''transform(filter(data,x-> 
   array_contains(map_keys(x),"key2")),y->y["key1"])''')[0])).show()
+---+-----+--------------------+----+
| id|point|                data|Key1|
+---+-----+--------------------+----+
|dfb|    6|[[key1 -> 124, ke...| 124|
|bgd|    7|[[key3 -> 324, ke...| 777|
|34d|    6|[[key1 -> 111, ke...| 111|
+---+-----+--------------------+----+

相关问题