如何解析嵌套列表的json字符串以在pyspark中触发Dataframe?

jjjwad0x  于 2021-07-13  发布在  Spark
关注(0)|答案(2)|浏览(759)

如何在pyspark中解析嵌套列表的json字符串来触发Dataframe?
输入Dataframe:

+-------------+-----------------------------------------------+
|url          |json                                           |
+-------------+-----------------------------------------------+
|https://url.a|[[1572393600000, 1.000],[1572480000000, 1.007]]|
|https://url.b|[[1572825600000, 1.002],[1572912000000, 1.000]]|
+-------------+-----------------------------------------------+

root
 |-- url: string (nullable = true)
 |-- json: string (nullable = true)

预期产量:

+---------------------------------------+
|col_1 | col_2               | col_3    |
+---------------------------------------+
| a    | 1572393600000       |  1.000   | 
| a    | 1572480000000       |  1.007   |
| b    | 1572825600000       |  1.002   |
| b    | 1572912000000       |  1.000   |
+---------------------------------------+

示例代码:

import pyspark
import pyspark.sql.functions as F

spark = (pyspark.sql.SparkSession.builder.appName("Downloader_standalone")
    .master('local[*]')
    .getOrCreate())

sc = spark.sparkContext
from pyspark.sql import Row

rdd_list  = [('https://url.a','[[1572393600000, 1.000],[1572480000000, 1.007]]'),
             ('https://url.b','[[1572825600000, 1.002],[1572912000000, 1.000]]')]

jsons = sc.parallelize(rdd_list) 

df = spark.createDataFrame(jsons, "url string, json string")
df.show(truncate=False)
df.printSchema()

(df.withColumn('json', F.from_json(F.col('json'),"array<string,string>"))
.select(F.explode('json').alias('col_1', 'col_2', 'col_3')).show())

例子不多,但我想不出怎么做:
如何从pyspark中的sparkDataframe行解析和转换json字符串
如何从pyspark中的sparkDataframe行转换具有多个键的json字符串?

d5vmydt9

d5vmydt91#

在字符串中进行一些替换并通过拆分,可以得到所需的结果:

from pyspark.sql import functions as F

df1 = df.withColumn(
    "col_1",
    F.regexp_replace("url", "https://url.", "")
).withColumn(
    "col_2_3",
    F.explode(
        F.expr("""transform(
            split(trim(both '][' from json), '\\\],\\\['), 
            x -> struct(split(x, ',')[0] as col_2, split(x, ',')[1] as col_3)
        )""")
    )
).selectExpr("col_1", "col_2_3.*")

df1.show(truncate=False)

# +-----+-------------+------+

# |col_1|col_2        |col_3 |

# +-----+-------------+------+

# |a    |1572393600000| 1.000|

# |a    |1572480000000| 1.007|

# |b    |1572825600000| 1.002|

# |b    |1572912000000| 1.000|

# +-----+-------------+------+

说明: trim(both '][' from json) :删除尾随字符和前导字符 [ 以及 ] ,得到如下结果: 1572393600000, 1.000],[1572480000000, 1.007 现在你可以按 ],[ ( \\\ 是为了逃离括号) transform 从拆分中获取数组,对于每个元素,它按逗号拆分并创建struct col_2 以及 col_3 分解从变换中得到的结构数组,并星形展开结构列

sz81bmfz

sz81bmfz2#

df.select(df.url, F.explode(F.from_json(df.json,"array<string>")))
.select("url",F.from_json((F.col("col")),"array<string>").alias("col"))
.select("url",F.col("col").getItem(0),F.col("col").getItem(1))
.show(truncate=False)

+-------------+-------------+------+
|url          |col[0]       |col[1]|
+-------------+-------------+------+
|https://url.a|1572393600000|1.0   |
|https://url.a|1572480000000|1.007 |
|https://url.b|1572825600000|1.002 |
|https://url.b|1572912000000|1.0   |
+-------------+-------------+------+

相关问题