我有一个Databricks笔记本,它读取csv文件作为ETL管道的第一步。有时csv文件没有所需的模式,这会导致笔记本崩溃。我需要在这些错误发生时处理它们,而不是让整个管道崩溃。
下面是我尝试处理这些错误的代码。当读取错误的csv文件时,我希望输出为“Exception caught”。
try:
newData = (spark.read
.format("csv")
.option("delimiter", "|")
.option("mode", "FAILFAST")
.option("inferSchema", "false")
.option("enforceSchema", "true")
.option("header", "True")
.schema(schema)
.load(bronzePath + "/"+ fileName + "*")
)
newData.display()
except FileReadException as e:
# Do stuff, handle exception
print("Exception caught")
但是这个异常永远不会被捕获,我得到了完整的异常作为输出。
FileReadException: Error while reading file dbfs:/mnt/datalake/bronze/<myPath>/<myFile.csv>.
Caused by: SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
Caused by: BadRecordException: org.apache.spark.sql.catalyst.csv.MalformedCSVException: Malformed CSV record
Caused by: MalformedCSVException: Malformed CSV record
Googling the issue帮助我理解了在pyspark中不可能捕获scala异常。
是否有其他方法可以捕获此异常?我还有其他选择吗?
我需要以某种方式处理收到的任何错误的csv文件。使用PERMISSIVE或DROPMALFORMED模式对我来说不是一个选择,因为我需要做出React并处理错误文件。
1条答案
按热度按时间qyyhg6bp1#
您可以使用PythonException类捕获错误。这段代码可以让你捕获异常。
我在csv的“ID”列中添加了一个字符串,以产生错误,因为我希望通过structype函数在ID列中输入一个整数。
下图显示了csv文件内容:
下面是程序的输出: