pyspark 阅读具有不同架构的CSV文件

vktxenjb  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(192)

我在S3上有两个csv文件:

  1. # a1.csv
  2. a,b
  3. 3,4

字符串

  1. # b2.csv
  2. a,c
  3. 1,"text"


我想一次读取这两个文件,确保最终的嵌套框架包含所有文件中的所有列,如下所示:

  1. +---+----+----+
  2. | a| b| c|
  3. +---+----+----+
  4. | 1|null|text|
  5. | 3| 4|null|
  6. +---+----+----+


我尝试了inferSchemaschema选项,但它们没有提供我期望的结果。
选项1:

  1. df = spark.read\
  2. .option("header", True)\
  3. .option("inferSchema", True)\
  4. .csv("s3a://test/*.csv")\
  5. .show()
  6. +---+----+
  7. | a| c|
  8. +---+----+
  9. | 1|text|
  10. | 3| 4|
  11. +---+----+


备选方案2:

  1. from pyspark.sql.types import StructType, StructField, IntegerType, StringType
  2. schema = StructType([
  3. StructField("a", IntegerType(), False)
  4. ,StructField("b", IntegerType(), True)
  5. ,StructField("c", StringType(), True)
  6. ])
  7. df = spark.read\
  8. .option("header", True)\
  9. .schema(schema)\
  10. .csv("s3a://test/*.csv")\
  11. .show()
  12. +---+----+----+
  13. | a| b| c|
  14. +---+----+----+
  15. | 1|null|null|
  16. | 3| 4|null|
  17. +---+----+----+


有什么办法吗?

wpcxdonn

wpcxdonn1#

如果文件格式是parquet,那么我们可以通过指向包含多个文件的文件夹使用mergeSchema选项轻松合并模式,但对于CSV文件,我们没有该选项。
您可以使用unionByName函数来获得所需的结果。
遍历文件夹,将文件读入到嵌套框架中,然后调用unionByName函数。

  1. df1.unionByName(df2, True)

字符串

相关问题