在PySpark中捕获spark异常

g6ll5ycj  于 2023-05-06  发布在  Spark
关注(0)|答案(1)|浏览(210)

我有一个Databricks笔记本,它读取csv文件作为ETL管道的第一步。有时csv文件没有所需的模式,这会导致笔记本崩溃。我需要在这些错误发生时处理它们,而不是让整个管道崩溃。
下面是我尝试处理这些错误的代码。当读取错误的csv文件时,我希望输出为“Exception caught”。

try:
    newData = (spark.read
              .format("csv")
              .option("delimiter", "|")
              .option("mode", "FAILFAST")
              .option("inferSchema", "false")
              .option("enforceSchema", "true")
              .option("header", "True")
              .schema(schema)
              .load(bronzePath + "/"+ fileName + "*")
         )
    newData.display()
except FileReadException as e:
    # Do stuff, handle exception
    print("Exception caught")

但是这个异常永远不会被捕获,我得到了完整的异常作为输出。

FileReadException: Error while reading file dbfs:/mnt/datalake/bronze/<myPath>/<myFile.csv>.
Caused by: SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
Caused by: BadRecordException: org.apache.spark.sql.catalyst.csv.MalformedCSVException: Malformed CSV record
Caused by: MalformedCSVException: Malformed CSV record

Googling the issue帮助我理解了在pyspark中不可能捕获scala异常。
是否有其他方法可以捕获此异常?我还有其他选择吗?
我需要以某种方式处理收到的任何错误的csv文件。使用PERMISSIVE或DROPMALFORMED模式对我来说不是一个选择,因为我需要做出React并处理错误文件。

qyyhg6bp

qyyhg6bp1#

您可以使用PythonException类捕获错误。这段代码可以让你捕获异常。
我在csv的“ID”列中添加了一个字符串,以产生错误,因为我希望通过structype函数在ID列中输入一个整数。

from pyspark.sql.types import StructType, StructField, IntegerType, 
StringType

schema = StructType([StructField("id", IntegerType()),StructField("name", 
StringType())])

try:
  newData = (spark.read
          .format("csv")
          .option("delimiter", "|")
          .option("mode", "FAILFAST")
          .option("inferSchema", "false")
          .option("enforceSchema", "true")
          .option("header", "True")
          .schema(schema)
          .load("/mnt/asda-poc/source/interface_3")
     )
  newData.write.saveAsTable("test")
except Exception as e:
  # Do stuff, handle exception
  print("Following is your exception = ",e)

下图显示了csv文件内容:

下面是程序的输出:

相关问题