我有以下模式
json_schema_string = "{\"fields\":[{\"metadata\":{},\"name\":\"Name\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"empid\",\"nullable\":false,\"type\":\"integer\"},{\"metadata\":{},\"name\":\"age\",\"nullable\":false,\"type\":\"integer\"},{\"metadata\":{},\"name\":\"ph_number\",\"nullable\":true,\"type\":\"long\"},{\"metadata\":{},\"name\":\"address\",\"nullable\":true,\"type\":\"string\"}],\"type\":\"struct\"}"
转换成
StructType([StructField('Name', StringType(), True), StructField('empid', IntegerType(), False), StructField('age', IntegerType(), False), StructField('ph_number', LongType(), True), StructField('address', StringType(), True)])
我正在运行以下代码=>
df_current = spark.read\
.option("header","true")\
.schema(json_schema)\
.option("inferSchema","false")\
.option("badRecordsPath", f"{bad_records_path_session}/{STEP_NAME}") \
.option("multiLine", "true")\
.option('escape', "\"")\
.option('delimiter', ',')\
.option("encoding", "UTF-8")\
.csv(file_path)\
.selectExpr("*", "lower(_metadata.file_name) as pmd_file_path", "_metadata.file_modification_time as pmd_file_mod_DTM", f"{time_of_run} as pmd_batch_id")
##df_current.write.format("delta").mode("append").saveAsTable(f"{catalog_name}.dnu_development.test_reject_records")
df_current.show()
但是当我打印**df_current.show()**时,我可以在字段中看到Null,因此=>reject_file_loc中缺少行(参考屏幕截图)
1条答案
按热度按时间mwg9r5ms1#
Spark模式中的 nullable 属性用于指定列是否允许空值。
在您的代码中,您使用 nullable=False 定义了age列,这意味着该列不应包含空值。
需要注意的是,模式中的nullable属性主要用于在读取操作期间强制执行模式,以及优化数据处理。当您将列定义为nullable=False时,Spark确保列值符合指定的数据类型,并且在模式推断或显式应用模式时不包含空值。
在您的情况下,如果您有一个CSV文件,其中年龄列具有空值,则它们将被视为空字符串或非空值。该列将不会仅基于空检查被过滤掉,因为空字符串被视为非空值。
如果要将空字符串视为null值并根据null检查将其筛选掉
在你的情况下,你可以使用下面的代码:
通过将(col(“age”)==“”)添加到筛选条件中,它将同时检查null和空字符串值,从而有效地筛选出age列为null或空的行。这可确保DataFrame仅保存年龄列中具有非空和非空字符串值的记录。(OR)我还尝试了一种方法,可以编写过滤NULL值并将它们写入路径。
file_path="/FileStore/tables/Convertedjson.csv”output_path_null =“/FileStore/tables/null_records”output_path_non_null =“/FileStore/tables/non_null_records”
下面是代码:
过滤DataFrame空记录:
将空记录的DataFrame保存到特定位置:
df_null.write.mode("overwrite").option("header", "true").csv(output_path_null)
此代码将***df_null***DataFrame保存为CSV