使用UDF初始化新列时,Pyspark引发任务失败错误

pvabu6sv  于 2023-05-28  发布在  Spark
关注(0)|答案(1)|浏览(127)

我有一个Spark DataFrame:

+--------------------+--------------------+------+
|                 qid|       question_text|target|
+--------------------+--------------------+------+
|403d7d49a9713e6f7caa|Do coconut trees ...|     0|
|edc9a2709785501cce09|What are 5 must-r...|     0|
|c912510d490de9e8c55c|How can I make my...|     0|
|bcd2395ccebea4d57604|Is reading biogra...|     0|
|d26dcd0879465c08ace5|Is the reason why...|     1|
|1c6398530de0b31e985a|Does John McCain ...|     1|
|69c68408690e66889e6a|Isn't the time th...|     1|
|d1a7d8a1da31041048a6|Why do people eat...|     0|
|54e96f880709e3cf9dd7|How do I get the ...|     0|
|f89c04a1c61487623ba9|What does the kno...|     0|
+--------------------+--------------------+------+

我正在尝试应用此UDF:

def check_all_spelling_correct(sentence):
  
  doc=nlp(sentence)
  spell_check_performed=0
  if doc._.performed_spellCheck==True:
    spell_check_performed=1
  return spell_check_performed

使用:

check_all_spelling_correct_UDF = udf(lambda x:check_all_spelling_correct(x),IntegerType())
df_sub_2=df_sub.withColumn("Is spelling correct", check_all_spelling_correct_UDF(col("question_text")))
df_sub_2.collect()

但是我得到了这个错误:Py4JJavaError:调用o235.collectToPython时出错。:org. apache. spark. SparkException:作业由于阶段故障而中止:阶段100.0中的任务0失败1次,最近一次失败:在阶段100.0中丢失任务0.0(TID 1441)(DESKTOP-2TQTOQ2执行器驱动程序):org.apache.spark.SparkException: Python worker无法重新连接。在org.ApacheSparkapi. python. PythonWorkerFactory。createSimpleWorker(PythonWorkerFactory. scala:188)在org.ApacheSparkapi. python. PythonWorkerFactory。create(PythonWorkerFactory. scala:108)在org.ApacheSparkSparkEnv. createPythonWorker(SparkEnv. scala:121)在org.ApacheSparkapi. python. BasePythonRunner。compute(PythonRunner. scala:162)在org.ApacheSparksql。处决巨蟒BatchEvalPythonExec. evaluate(BatchEvalPythonExec. scala:81)at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec. scala:130)at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD. scala:863)at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD. scala:863)在org.ApacheSparkrdd. MapPartitionsRDD. compute(MapPartitionsRDD. scala:52)在org.ApacheSparkrdd. RDD。computeOrReadCheckpoint(RDD. scala:373)在org.ApacheSparkrdd. RDD。迭代器(RDD. scala:337)在org.ApacheSparkrdd. MapPartitionsRDD. compute(MapPartitionsRDD. scala:52)在org.ApacheSparkrdd. RDD。computeOrReadCheckpoint(RDD. scala:373)在org.ApacheSparkrdd. RDD。迭代器(RDD. scala:337)在org.ApacheSparkrdd. MapPartitionsRDD. compute(MapPartitionsRDD. scala:52)在org.ApacheSparkrdd. RDD。computeOrReadCheckpoint(RDD. scala:373)在org.ApacheSparkrdd. RDD。迭代器(RDD. scala:337)在org.ApacheSpark调度程序。ResultTask。runTask(ResultTask. scala:90)在 www.example.com 接受在java时超时。net. DualStackPlainSocketImpl. waitForNewConnection(Native Method)在java中。net. DualStackPlainSocketImpl. socketAccept(DualStackPlainSocketImpl.java:131)at java. net. AbstractPlainSocketImpl. accept(AbstractPlainSocketImpl.java:535)at java. net. PlainSocketImpl. accept(PlainSocketImpl.java:189)at java. net. ServerSocket。implAccept(ServerSocket.java:545)at java. net. ServerSocket。accept(ServerSocket.java:513)at org.ApacheSparkapi. python. PythonWorkerFactory。createSimpleWorker(PythonWorkerFactory. scala:175)... TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624) at java.lang.Thread.run (Thread.java:750) Caused by: java.net.SocketTimeoutException: Accept timed out at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method) at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:131) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:535) at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:189) at java.net.ServerSocket.implAccept(ServerSocket.java:545) at java.net.ServerSocket.accept(ServerSocket.java:513) at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:175) ... 24 more

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.collect(RDD.scala:1029) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:394) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:338) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:366) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:338) at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3538) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704) at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3535) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.SparkException: Python worker failed to connect back. at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:188) at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:108) at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121) at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162) at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81) at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:130) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: java.net.SocketTimeoutException: Accept timed out at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method) at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:131) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:535) at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:189) at java.net.ServerSocket.implAccept(ServerSocket.java:545) at java.net.ServerSocket.accept(ServerSocket.java:513) at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:175) ... 24 more

nhaq1z21

nhaq1z211#

df_sub_2.collect()将把所有数据带到驱动程序节点,当处理大型数据集时,这是一个非常昂贵的操作,会导致阶段失败。
使用df.take(10)而不是df.collect()来查看输出 Dataframe 的前十行。
或者,如果你只想使用df.collect(),那么你可以像这样增加spark.driver.maxResultSize(默认为1G)属性,然后使用df.collect()

spark = SparkSession.builder \
    .master('local[*]') \
    .appName('StackOverflow') \
    .config('spark.dirver.maxResultSize', '4096') \
    .getOrCreate()

相关问题