我正在使用databricks和pyspark。 rdd
包含一个文件路径列表,然后我处理每个文件并生成一些输出。 rdd.map(func).collect()
,在哪里 func
将处理每个文件以生成一些输出。然而, func
使用一些c库和坏文件导致 segmentation fault
到任务流程。我听不到这个错误。
当前,作业失败,因为任务在连接重置后重试4次。我不希望spark停止,而是继续处理剩余的记录,忽略失败的记录,如果失败则重新启动新任务。可以忽略那些损坏的文件。如何设置spark/databricks?
这里是带超时的多重处理:如果超时是由于 segmentation fault
,只需终止此进程并继续下一个进程,而不使主进程崩溃。它在没有spark的情况下运行良好,但在spark中由于pickle错误而失败,而且databricks中似乎不支持多处理。
with Pool(min(process_no, cpu_count())) as pool:
pools = []
for input in inputs:
res = pool.apply_async(run_single, (input,))
pools.append(res)
for pool in pools:
try:
pool.get(timeout=20)
except Exception as e:
print("We lacked patience and got a multiprocessing.TimeoutError")
continue
更新:
这是密码。我有4台16核的机器(运行64个并行任务)。
def func(input):
import signal
def call_the_actual_logic(input):
.....
return [0]
def timeoutHandler(signum, frame):
raise TimeoutError("Task takes too long")
try:
# install a timeout handler inside of the Spark task
# following https://docs.python.org/3/library/signal.html#example
signal.signal(signal.SIGALRM, timeoutHandler)
signal.alarm(10) # cancel the task after 5 seconds
# call the actual logic
result = call_the_actual_logic(input)
signal.alarm(0)
return result
except TimeoutError:
print("Logic for input {} took too long. Cleaning up...".format(input))
return []
except Exception as e:
print("Logic for input {} threw an exception. Cleaning up...".format(input))
return []
a = rdd.repartition(2048).flatMap(func).collect()
下面是spark抛出的异常。我怀疑某个worker崩溃是因为某个文件损坏和未捕获的错误,因为我看到一些处理错误。从下面的错误来看,似乎是另一个例外。如果工作进程崩溃,是否可以忽略这个损坏的文件并重新启动它?
org.apache.spark.SparkException: Job aborted due to stage failure: Task 162 in stage 49.0 failed 4 times, most recent failure: Lost task 162.3 in stage 49.0 (TID 20313, 10.139.64.4, executor 4): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<command-3638575280205630> in <module>
87
88
---> 89 a = rdd.repartition(2048).flatMap(main_run_spark).collect()
/databricks/spark/python/pyspark/rdd.py in collect(self)
901 # Default path used in OSS Spark / for non-credential passthrough clusters:
902 with SCCallSiteSync(self.context) as css:
--> 903 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
904 return list(_load_from_socket(sock_info, self._jrdd_deserializer))
905
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a,**kw)
125 def deco(*a,**kw):
126 try:
--> 127 return f(*a,**kw)
128 except py4j.protocol.Py4JJavaError as e:
129 converted = convert_exception(e.java_exception)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 162 in stage 49.0 failed 4 times, most recent failure: Lost task 162.3 in stage 49.0 (TID 20313, 10.139.64.4, executor 4): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:636)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:625)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:743)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:721)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:556)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1011)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2373)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:677)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:680)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:728)
... 30 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2519)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2466)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2460)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2460)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1152)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1152)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1152)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2721)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2668)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2656)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2333)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2354)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2373)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1011)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:395)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1010)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:260)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.GeneratedMethodAccessor559.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:636)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:625)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:743)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:721)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:556)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1011)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2373)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:677)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:680)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:728)
... 30 more
1条答案
按热度按时间nhaq1z211#
您可以在中实现错误和/或超时处理
func
,以便对每个spark任务进行独立的异常处理。要调用函数,请使用flatmap而不是map,以便
func
可以返回一个或零个结果:免责声明:我已经在linux机器上的普通vanilla spark安装上测试了这段代码,而不是在databricks笔记本上