验证parquet文件中的空值

6vl6ewon  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(529)

我在看第三方的Parquet文件。似乎parquet总是将文件的模式转换为可为空的列,而不管它们是如何编写的。
在读取这些文件时,我想拒绝在特定列中包含空值的文件。使用csv或json,您可以:

schema = StructType([StructField("id", IntegerType(), False), StructField("col1", IntegerType(), False)])
df = spark.read.format("csv").schema(schema).option("mode", "FAILFAST").load(myPath)

如果加载中包含空值,则将拒绝加载 col1 . 如果你在Parquet地板上试试这个,它会被接受的。
我可以做一个过滤器,或者对空值的列进行计数,然后引发一个错误——从性能的Angular 来看,这是非常糟糕的,因为我将在作业中得到一个额外的阶段。它还将拒绝完整的Dataframe和所有文件(是的,csv路由也会这样做)。
在读取文件时是否有强制验证的方法?
如果有帮助的话,我用的是spark3版本。
编辑示例:

from pyspark.sql.types import *
schema = StructType([
  StructField("Id", IntegerType(), False),
  StructField("col1", IntegerType(), True)
])
df = spark.createDataFrame([(1,1),(2, None)], schema)
df.write.format("parquet").mode("overwrite").save("/tmp/parquetValidation/")
df2 = spark.read.format("parquet").load("/tmp/parquetValidation/")
df2.printSchema()

退货

|-- Id: integer (nullable = true) 
|-- col1: integer (nullable = true)

使用阻止空值的架构重新读取文件:

schema = StructType([
  StructField("Id", IntegerType(), False),
  StructField("col1", IntegerType(), False)
])
df3 = spark.read.format("parquet").schema(schema).option("mode", "FAILFAST").load("/tmp/parquetValidation/")
df3.printSchema()

退货:

|-- Id: integer (nullable = true) 
|-- col1: integer (nullable = true)

即未应用架构。

nwlls2ji

nwlls2ji1#

感谢@sasa对这个问题的评论。

from pyspark.sql import DataFrame

schema = StructType([
  StructField("Id", IntegerType(), False),
  StructField("col1", IntegerType(), False)
])

df_junk = spark.read.format("parquet").schema(schema).load("/tmp/parquetValidation/")

new_java_schema = spark._jvm.org.apache.spark.sql.types.DataType.fromJson(schema.json())
java_rdd = df_junk._jdf.toJavaRDD()
new_jdf = spark._jsparkSession.createDataFrame(java_rdd, new_java_schema)
df_validate = DataFrame(new_jdf, df.sql_ctx)

df_validate.printSchema()

退货

|-- Id: integer (nullable = false)
|-- col1: integer (nullable = false)

运行操作会导致:

java.lang.RuntimeException: The 1th field 'col1' of input row cannot be null.

使用javardd不太好,但它很有效

相关问题