如何使用pyspark从spark dataframe中的Column中读取JSON作为字符串或字典,而不使用RDD或collect()?

b4qexyjb  于 2023-05-19  发布在  Spark
关注(0)|答案(2)|浏览(198)

我有一个像这样的数据框

+-------------------------------------------+
|                                     output|
+-------------------------------------------+
|{"COLUMN1": "123", "COUMN2": {"A":1 "B":2}}|
+-------------------------------------------+

我只是想把JSON作为一个字符串或字典读到变量中,这样我就可以对它做进一步的操作。
问题是-

Row(output=Row(COLUMN1='123', ...

DF是如何创建的?

nextdf = df.select(struct(col("COLUMN1"),col("COLUMN2"),col("COLUMN3")).alias("output"))

输出应为-{"COLUMN1": "123", "COUMN2": {"A":1 "B":2}}
请让我知道我可以尝试什么?

dtcbnfnu

dtcbnfnu1#

这种情况下可以使用**toJSON()**。

Example:

from pyspark.sql.functions import *
df = spark.createDataFrame([(1, "foo"),(2, "bar"),],["id", "label"])

df1= df.withColumn("temp", concat_ws(" ", *df.columns)).groupBy(lit(1)).agg(array_join(collect_list(col("temp"))," ").alias("new_column")).drop("1")

print(df1.select(struct(col("new_column")).alias("new")).toJSON().collect()[0])
#{"new":{"new_column":"1 foo 2 bar"}}

得到东西而不收集然后使用

Save as Text:

使用**save作为text文件,带有header**标志false,从输出文件中转义列名。

df.coalesce(1).write.format("text").option("header", "false").save("output.txt")
7hiiyaii

7hiiyaii2#

RDD是旧接口,DataFrame是新/替换接口。使用DataFrame方法。

my_var = nextdf.collect()[0].asDict()['output']
print(my_var)
# should print `{"COLUMN1": "123", "COUMN2": {"A":1 "B":2}}`

相关问题