如何让pyspark/databricks作业继续运行并在任务失败后忽略错误记录

syqv5f0l  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(336)

我正在使用databricks和pyspark。 rdd 包含一个文件路径列表,然后我处理每个文件并生成一些输出。 rdd.map(func).collect() ,在哪里 func 将处理每个文件以生成一些输出。然而, func 使用一些c库和坏文件导致 segmentation fault 到任务流程。我听不到这个错误。
当前,作业失败,因为任务在连接重置后重试4次。我不希望spark停止,而是继续处理剩余的记录,忽略失败的记录,如果失败则重新启动新任务。可以忽略那些损坏的文件。如何设置spark/databricks?
这里是带超时的多重处理:如果超时是由于 segmentation fault ,只需终止此进程并继续下一个进程,而不使主进程崩溃。它在没有spark的情况下运行良好,但在spark中由于pickle错误而失败,而且databricks中似乎不支持多处理。

with Pool(min(process_no, cpu_count())) as pool:
        pools = []
        for input in inputs:
            res = pool.apply_async(run_single, (input,))
            pools.append(res)
        for pool in pools:
            try:
                pool.get(timeout=20)
            except Exception as e:
                print("We lacked patience and got a multiprocessing.TimeoutError")
                continue

更新:
这是密码。我有4台16核的机器(运行64个并行任务)。

def func(input):
    import signal
    def call_the_actual_logic(input):
        .....
        return [0]

    def timeoutHandler(signum, frame):
        raise TimeoutError("Task takes too long")

    try:
        # install a timeout handler inside of the Spark task
        # following https://docs.python.org/3/library/signal.html#example
        signal.signal(signal.SIGALRM, timeoutHandler)
        signal.alarm(10)  # cancel the task after 5 seconds

        # call the actual logic
        result = call_the_actual_logic(input)
        signal.alarm(0)

        return result
    except TimeoutError:
        print("Logic for input {} took too long. Cleaning up...".format(input))
        return []
    except Exception as e:
        print("Logic for input {} threw an exception. Cleaning up...".format(input))
        return []

a = rdd.repartition(2048).flatMap(func).collect()

下面是spark抛出的异常。我怀疑某个worker崩溃是因为某个文件损坏和未捕获的错误,因为我看到一些处理错误。从下面的错误来看,似乎是另一个例外。如果工作进程崩溃,是否可以忽略这个损坏的文件并重新启动它?

org.apache.spark.SparkException: Job aborted due to stage failure: Task 162 in stage 49.0 failed 4 times, most recent failure: Lost task 162.3 in stage 49.0 (TID 20313, 10.139.64.4, executor 4): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<command-3638575280205630> in <module>
     87 
     88 
---> 89 a = rdd.repartition(2048).flatMap(main_run_spark).collect()

/databricks/spark/python/pyspark/rdd.py in collect(self)
    901         # Default path used in OSS Spark / for non-credential passthrough clusters:
    902         with SCCallSiteSync(self.context) as css:
--> 903             sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    904         return list(_load_from_socket(sock_info, self._jrdd_deserializer))
    905 

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a,**kw)
    125     def deco(*a,**kw):
    126         try:
--> 127             return f(*a,**kw)
    128         except py4j.protocol.Py4JJavaError as e:
    129             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 162 in stage 49.0 failed 4 times, most recent failure: Lost task 162.3 in stage 49.0 (TID 20313, 10.139.64.4, executor 4): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:636)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:625)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:743)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:721)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:556)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
    at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
    at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1011)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2373)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
    at org.apache.spark.scheduler.Task.run(Task.scala:117)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:677)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:680)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:728)
    ... 30 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2519)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2466)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2460)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2460)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1152)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1152)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1152)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2721)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2668)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2656)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2333)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2354)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2373)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1011)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:395)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1010)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:260)
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    at sun.reflect.GeneratedMethodAccessor559.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:636)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:625)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:743)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:721)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:556)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
    at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
    at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1011)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2373)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
    at org.apache.spark.scheduler.Task.run(Task.scala:117)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:677)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:680)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:728)
    ... 30 more
nhaq1z21

nhaq1z211#

您可以在中实现错误和/或超时处理 func ,以便对每个spark任务进行独立的异常处理。
要调用函数,请使用flatmap而不是map,以便 func 可以返回一个或零个结果:

def func(input):
    import signal
    def timeoutHandler(signum, frame):
        raise TimeoutError("Task takes too long")
    try:
         #install a timeout handler inside of the Spark task
         #following https://docs.python.org/3/library/signal.html#example
        signal.signal(signal.SIGALRM, timeoutHandler)
        signal.alarm(5) #cancel the task after 5 seconds

        #call the actual logic
        result = call_the_actual_logic(input)
        signal.alarm(0) 

        #return an iterator with a single element
        return iter([result])

    except TimeoutError:
        #handle a timeout and return an empty iterator
        print("Logic for input {} took too long. Cleaning up...".format(input))
        return iter([])
    except Exception:
        #handle any other exception and return an empty iterator
        print("Logic for input {} threw an exception. Cleaning up...".format(input))
        return iter([])

def call_the_actual_logic(input):
    #call the C libraries and return the result
    return ...

# trigger the calculation

rdd2 = rdd.flatMap(func)

免责声明:我已经在linux机器上的普通vanilla spark安装上测试了这段代码,而不是在databricks笔记本上

相关问题