pyspark Spark CSV阅读器-CSV最后一列中的JSON内容格式错误

d7v8vwbk  于 2023-11-16  发布在  Spark
关注(0)|答案(4)|浏览(150)

我在CSV文件中有数据,如下所示:

  1. 1,2,3,{col1:1,col2:2,col3:3}
  2. 1,2,3,{col1:2,col2:2,col3:4}
  3. 1,2,3,{col1:3,col2:3,col3:3}

字符串
使用Spark代码读取此数据,以在框架中以这种格式输出:

  1. col1,col2,col3
  2. 1,2,3
  3. 2,2,4
  4. 3,3,3


我尝试了几种使用引号和自定义模式的方法。但是,我无法正确地将所有数据都包含在框架中。使用自定义模式,我能够读取如下所示的 df(在第一个“,“之后截断):

  1. c1,c2,c3,c4
  2. 1,2,3,{col1:1
  3. 1,2,3,{col1:2
  4. 1,2,3,{col1:3

nle07wnf

nle07wnf1#

这里的问题是,您的CSV分隔符也用于JSON列中,而没有对其进行转义或引用。最好的选择是要求您的源提供更好格式的CSV文件(或使用不同的分隔符)。
如果这不是一个选项,您可以在阅读文件后自己拆分文件。对于您的示例,可以如下所示完成。在这里,我们将CSV内容作为文本文件读取,并将其拆分到第4个逗号。从结果数组中,我们选择我们想要的列。

  1. import pyspark.sql.functions as F
  2. df = spark.read.text('data.csv')
  3. cols_to_split = 4
  4. splitted = F.split(F.col('value'), ',', limit=cols_to_split)
  5. df.select([splitted[i].alias(f'col{i}') for i in range(cols_to_split)]).show()

字符串
输出量:

  1. +----+----+----+--------------------+
  2. |col0|col1|col2| col3|
  3. +----+----+----+--------------------+
  4. | 1| 2| 3|{col1:1,col2:2,co...|
  5. | 1| 2| 3|{col1:2,col2:2,co...|
  6. | 1| 2| 3|{col1:3,col2:3,co...|
  7. +----+----+----+--------------------+


请注意,您的真实的数据可能更复杂或结构不同,此解决方案可能不起作用。split 函数将正则表达式作为模式,因此您可以编写适合您的内容。
最后,请注意,自Spark >= 3.0以来,limit参数已被支持。

展开查看全部
w8f9ii69

w8f9ii692#

我认为你必须使用from_json函数。
定义JSON列的模式:

  1. from pyspark.sql.functions import from_json
  2. from pyspark.sql.types import StructType, StructField, IntegerType
  3. json_schema = StructType(
  4. [
  5. StructField("col1", IntegerType()),
  6. StructField("col2", IntegerType()),
  7. StructField("col3", IntegerType()),
  8. ]
  9. )

字符串

示例片段

将JSON字符串解析为结构

  1. df = df.withColumn("c4", from_json("json_str", json_schema))

展开查看全部
de90aj5v

de90aj5v3#

我遇到过类似的问题,当我使用 escape 选项时,它得到了解决。

  1. df = spark.read.format('csv').options(header='True', delimiter=",", escape='"').load(path)

字符串

z4iuyo4d

z4iuyo4d4#

  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.functions import col, from_json
  3. from pyspark.sql.types import StructType, StructField, StringType, IntegerType
  4. spark = SparkSession.builder.appName("CSV Read Example").getOrCreate()
  5. json_schema = StructType([
  6. StructField("col1", IntegerType()),
  7. StructField("col2", IntegerType()),
  8. StructField("col3", IntegerType())
  9. ])
  10. df = spark.read.csv("path_to_your_file.csv")
  11. df = df.withColumnRenamed("_c0", "c1")\
  12. .withColumnRenamed("_c1", "c2")\
  13. .withColumnRenamed("_c2", "c3")\
  14. .withColumnRenamed("_c3", "json_col")
  15. df = df.withColumn("json_col", from_json("json_col", json_schema))
  16. df = df.select(
  17. col("json_col.col1").alias("col1"),
  18. col("json_col.col2").alias("col2"),
  19. col("json_col.col3").alias("col3")
  20. )
  21. df.show()
  22. spark.stop()

字符串

展开查看全部

相关问题