pyspark databricks spark允许列中的空值,即使架构可为空false

vfh0ocws  于 2023-06-28  发布在  Spark
关注(0)|答案(1)|浏览(141)

我有以下模式

json_schema_string = "{\"fields\":[{\"metadata\":{},\"name\":\"Name\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"empid\",\"nullable\":false,\"type\":\"integer\"},{\"metadata\":{},\"name\":\"age\",\"nullable\":false,\"type\":\"integer\"},{\"metadata\":{},\"name\":\"ph_number\",\"nullable\":true,\"type\":\"long\"},{\"metadata\":{},\"name\":\"address\",\"nullable\":true,\"type\":\"string\"}],\"type\":\"struct\"}"

转换成

StructType([StructField('Name', StringType(), True), StructField('empid', IntegerType(), False), StructField('age', IntegerType(), False), StructField('ph_number', LongType(), True), StructField('address', StringType(), True)])

我正在运行以下代码=>

df_current = spark.read\
.option("header","true")\
.schema(json_schema)\
.option("inferSchema","false")\
.option("badRecordsPath", f"{bad_records_path_session}/{STEP_NAME}") \
.option("multiLine", "true")\
.option('escape', "\"")\
.option('delimiter', ',')\
.option("encoding", "UTF-8")\
.csv(file_path)\
.selectExpr("*", "lower(_metadata.file_name) as pmd_file_path", "_metadata.file_modification_time as pmd_file_mod_DTM", f"{time_of_run} as pmd_batch_id")

##df_current.write.format("delta").mode("append").saveAsTable(f"{catalog_name}.dnu_development.test_reject_records")
df_current.show()

但是当我打印**df_current.show()**时,我可以在字段中看到Null,因此=>reject_file_loc中缺少行(参考屏幕截图)

mwg9r5ms

mwg9r5ms1#

Spark模式中的 nullable 属性用于指定列是否允许空值。
在您的代码中,您使用 nullable=False 定义了age列,这意味着该列不应包含空值。
需要注意的是,模式中的nullable属性主要用于在读取操作期间强制执行模式,以及优化数据处理。当您将列定义为nullable=False时,Spark确保列值符合指定的数据类型,并且在模式推断或显式应用模式时不包含空值。
在您的情况下,如果您有一个CSV文件,其中年龄列具有空值,则它们将被视为空字符串或非空值。该列将不会仅基于空检查被过滤掉,因为空字符串被视为非空值。
如果要将空字符串视为null值并根据null检查将其筛选掉
在你的情况下,你可以使用下面的代码:

df_null = df_current.filter(col("Name").isNull() |
                            col("empid").isNull() |
                            col("age").isNull() |
                            (col("age") == "") |
                            col("ph_number").isNull() |
                            col("address").isNull())

通过将(col(“age”)==“”)添加到筛选条件中,它将同时检查null和空字符串值,从而有效地筛选出age列为null或空的行。这可确保DataFrame仅保存年龄列中具有非空和非空字符串值的记录。(OR)我还尝试了一种方法,可以编写过滤NULL值并将它们写入路径。

  • 在提供文件路径和输出路径的帮助下实现非空值沿着记录空值
  • Path下面的这些指定输入CSV文件的文件路径(file_path)以及用于保存具有空记录(output_path_null)和非空记录(output_path_non_null)的DataFrames的输出路径。

file_path="/FileStore/tables/Convertedjson.csv”output_path_null =“/FileStore/tables/null_records”output_path_non_null =“/FileStore/tables/non_null_records”

下面是代码:

from pyspark.sql import SparkSession
        from pyspark.sql.types import StructType,
        StructField, StringType, IntegerType,  LongType
        from pyspark.sql.functions import col
        spark = SparkSession.builder.getOrCreate()
        Converted_json_schema = StructType([
        StructField('Name', StringType(), True),
        StructField('empid', IntegerType(), False),
        StructField('age', IntegerType(), False),
        StructField('ph_number', LongType(), True),
        StructField('address', StringType(), True)
        ])
        file_path = "/FileStore/tables/Convertedjson.csv"
        output_path_null = "/FileStore/tables/null_records"
        output_path_non_null = "/FileStore/tables/non_null_records"
        df_current = spark.read \
        .option("header", "true") \
        .schema(Converted_json_schema) \
        .option("inferSchema", "false") \
        .option("multiLine", "true") \
        .option('escape', "\"") \
        .option('delimiter', ',') \
        .option("encoding", "UTF-8") \
        .csv(file_path)
        **Filter DataFrame for null records**
        df_null = df_current.filter(col("Name").isNull() |
        col("empid").isNull() |
        col("age").isNull() |
        col("ph_number").isNull() |
        col("address").isNull())
        df_null.show()
        **Save the DataFrame with null records to a specific location (overwrite if it already exists)**
        df_null.write.mode("overwrite").option("header", "true").csv(output_path_null)
        **Filter DataFrame for non-null records**
        df_non_null = df_current.filter(col("Name").isNotNull() &
        col("empid").isNotNull() &
        col("age").isNotNull() &
        col("ph_number").isNotNull() &
        col("address").isNotNull())
        df_non_null.show()
        **Save the DataFrame with non-null records to a specific location (overwrite if it already exists)**
        df_non_null.write.mode("overwrite").option("header", "true").csv(output_path_non_null)

过滤DataFrame空记录:

  • 在上面的代码中,使用filter函数过滤df_current DataFrame,并使用isNull函数检查每列中的空值。
  • 保留任何指定列具有空值的行。生成的DataFrame(只包含空记录)存储在df_null中,然后使用show()显示。
    将空记录的DataFrame保存到特定位置:df_null.write.mode("overwrite").option("header", "true").csv(output_path_null)此代码将***df_null***DataFrame保存为CSV

相关问题