如何解析嵌套列表的json字符串以在pyspark中触发Dataframe?

jjjwad0x  于 2021-07-13  发布在  Spark
关注(0)|答案(2)|浏览(809)

如何在pyspark中解析嵌套列表的json字符串来触发Dataframe?
输入Dataframe:

  1. +-------------+-----------------------------------------------+
  2. |url |json |
  3. +-------------+-----------------------------------------------+
  4. |https://url.a|[[1572393600000, 1.000],[1572480000000, 1.007]]|
  5. |https://url.b|[[1572825600000, 1.002],[1572912000000, 1.000]]|
  6. +-------------+-----------------------------------------------+
  7. root
  8. |-- url: string (nullable = true)
  9. |-- json: string (nullable = true)

预期产量:

  1. +---------------------------------------+
  2. |col_1 | col_2 | col_3 |
  3. +---------------------------------------+
  4. | a | 1572393600000 | 1.000 |
  5. | a | 1572480000000 | 1.007 |
  6. | b | 1572825600000 | 1.002 |
  7. | b | 1572912000000 | 1.000 |
  8. +---------------------------------------+

示例代码:

  1. import pyspark
  2. import pyspark.sql.functions as F
  3. spark = (pyspark.sql.SparkSession.builder.appName("Downloader_standalone")
  4. .master('local[*]')
  5. .getOrCreate())
  6. sc = spark.sparkContext
  7. from pyspark.sql import Row
  8. rdd_list = [('https://url.a','[[1572393600000, 1.000],[1572480000000, 1.007]]'),
  9. ('https://url.b','[[1572825600000, 1.002],[1572912000000, 1.000]]')]
  10. jsons = sc.parallelize(rdd_list)
  11. df = spark.createDataFrame(jsons, "url string, json string")
  12. df.show(truncate=False)
  13. df.printSchema()
  14. (df.withColumn('json', F.from_json(F.col('json'),"array<string,string>"))
  15. .select(F.explode('json').alias('col_1', 'col_2', 'col_3')).show())

例子不多,但我想不出怎么做:
如何从pyspark中的sparkDataframe行解析和转换json字符串
如何从pyspark中的sparkDataframe行转换具有多个键的json字符串?

d5vmydt9

d5vmydt91#

在字符串中进行一些替换并通过拆分,可以得到所需的结果:

  1. from pyspark.sql import functions as F
  2. df1 = df.withColumn(
  3. "col_1",
  4. F.regexp_replace("url", "https://url.", "")
  5. ).withColumn(
  6. "col_2_3",
  7. F.explode(
  8. F.expr("""transform(
  9. split(trim(both '][' from json), '\\\],\\\['),
  10. x -> struct(split(x, ',')[0] as col_2, split(x, ',')[1] as col_3)
  11. )""")
  12. )
  13. ).selectExpr("col_1", "col_2_3.*")
  14. df1.show(truncate=False)
  15. # +-----+-------------+------+
  16. # |col_1|col_2 |col_3 |
  17. # +-----+-------------+------+
  18. # |a |1572393600000| 1.000|
  19. # |a |1572480000000| 1.007|
  20. # |b |1572825600000| 1.002|
  21. # |b |1572912000000| 1.000|
  22. # +-----+-------------+------+

说明: trim(both '][' from json) :删除尾随字符和前导字符 [ 以及 ] ,得到如下结果: 1572393600000, 1.000],[1572480000000, 1.007 现在你可以按 ],[ ( \\\ 是为了逃离括号) transform 从拆分中获取数组,对于每个元素,它按逗号拆分并创建struct col_2 以及 col_3 分解从变换中得到的结构数组,并星形展开结构列

展开查看全部
sz81bmfz

sz81bmfz2#

  1. df.select(df.url, F.explode(F.from_json(df.json,"array<string>")))
  2. .select("url",F.from_json((F.col("col")),"array<string>").alias("col"))
  3. .select("url",F.col("col").getItem(0),F.col("col").getItem(1))
  4. .show(truncate=False)
  5. +-------------+-------------+------+
  6. |url |col[0] |col[1]|
  7. +-------------+-------------+------+
  8. |https://url.a|1572393600000|1.0 |
  9. |https://url.a|1572480000000|1.007 |
  10. |https://url.b|1572825600000|1.002 |
  11. |https://url.b|1572912000000|1.0 |
  12. +-------------+-------------+------+

相关问题