在pysparkDataframe中创建json

fslejnso  于 2021-07-12  发布在  Spark
关注(0)|答案(2)|浏览(329)

我在Pypark有个df

|id |Name   |Age |
|-- |------ |--- |
|1  |John   |31  |
|2  |Sam    |34  |
|3  |Chris  |28  |

我需要换到df以下,

|id         |       Name        |       Age     |
| ------    | ----------------  | -----------   |
|{'v':1}    |   {'v':'John'}    |   {'v':31}    |
|{'v':2}    |   {'v':'Sam'}     |   {'v':34}    |
|{'v':3}    |   {'v':'Chris'}   |   {'v':28}    |

尽管此代码在python中工作,但在Pypark中不工作:

df = df.apply(lambda x: [{'value':i} for i in x] , axis=0)

在pyspark中最有效的方法是什么,因为有50多列和10个这样的表。

eimct9ow

eimct9ow1#

你可以用 create_map 要创建Map列,请执行以下操作:

from pyspark.sql import functions as F

df1 = df.select(*[F.create_map(F.lit("v"), F.col(c)).alias(c) for c in df.columns])

df1.show()
+--------+------------+---------+
|      id|        Name|      Age|
+--------+------------+---------+
|[v -> 1]| [v -> John]|[v -> 31]|
|[v -> 2]|  [v -> Sam]|[v -> 34]|
|[v -> 3]|[v -> Chris]|[v -> 28]|
+--------+------------+---------+

使用 to_json 要从Map中获取json字符串:

df1.select(*[F.to_json(c).alias(c) for c in df1.columns]).show()

# +-------+-------------+--------+

# |     id|         Name|     Age|

# +-------+-------------+--------+

# |{"v":1}| {"v":"John"}|{"v":31}|

# |{"v":2}|  {"v":"Sam"}|{"v":34}|

# |{"v":3}|{"v":"Chris"}|{"v":28}|

# +-------+-------------+--------+
lrpiutwd

lrpiutwd2#

您可以将每一列转换为一个具有名为 value ,并转换为json:

import pyspark.sql.functions as F

df2 = df.select([F.to_json(F.struct(F.col(c).alias('value'))).alias(c) for c in df.columns])

df2.show()
+-------------+-----------------+--------------+
|           id|             Name|           Age|
+-------------+-----------------+--------------+
|{"value":"1"}| {"value":"John"}|{"value":"31"}|
|{"value":"2"}|  {"value":"Sam"}|{"value":"34"}|
|{"value":"3"}|{"value":"Chris"}|{"value":"28"}|
+-------------+-----------------+--------------+

相关问题