我在看第三方的Parquet文件。似乎parquet总是将文件的模式转换为可为空的列,而不管它们是如何编写的。
在读取这些文件时,我想拒绝在特定列中包含空值的文件。使用csv或json,您可以:
schema = StructType([StructField("id", IntegerType(), False), StructField("col1", IntegerType(), False)])
df = spark.read.format("csv").schema(schema).option("mode", "FAILFAST").load(myPath)
如果加载中包含空值,则将拒绝加载 col1
. 如果你在Parquet地板上试试这个,它会被接受的。
我可以做一个过滤器,或者对空值的列进行计数,然后引发一个错误——从性能的Angular 来看,这是非常糟糕的,因为我将在作业中得到一个额外的阶段。它还将拒绝完整的Dataframe和所有文件(是的,csv路由也会这样做)。
在读取文件时是否有强制验证的方法?
如果有帮助的话,我用的是spark3版本。
编辑示例:
from pyspark.sql.types import *
schema = StructType([
StructField("Id", IntegerType(), False),
StructField("col1", IntegerType(), True)
])
df = spark.createDataFrame([(1,1),(2, None)], schema)
df.write.format("parquet").mode("overwrite").save("/tmp/parquetValidation/")
df2 = spark.read.format("parquet").load("/tmp/parquetValidation/")
df2.printSchema()
退货
|-- Id: integer (nullable = true)
|-- col1: integer (nullable = true)
使用阻止空值的架构重新读取文件:
schema = StructType([
StructField("Id", IntegerType(), False),
StructField("col1", IntegerType(), False)
])
df3 = spark.read.format("parquet").schema(schema).option("mode", "FAILFAST").load("/tmp/parquetValidation/")
df3.printSchema()
退货:
|-- Id: integer (nullable = true)
|-- col1: integer (nullable = true)
即未应用架构。
1条答案
按热度按时间nwlls2ji1#
感谢@sasa对这个问题的评论。
退货
运行操作会导致:
使用javardd不太好,但它很有效