将pyspark df转换为pandas时引发的异常是等待结果

f45qwnt8  于 2021-07-14  发布在  Spark
关注(0)|答案(2)|浏览(609)

我正在尝试使用自定义项进行一些计算。但是在计算之后,当我尝试将pysparkDataframe转换为pandas时,它给了我 org.apache.spark.SparkException: Exception thrown in awaitResult: 我会写下可复制的代码。

import pandas as pd
import numpy as np
import time 

n = 10000
sample_df = pd.DataFrame(np.random.rand(n,n))
sample_df.columns = sample_df.columns.astype(str)
sample_df.index = sample_df.index.astype(str)
sample_df.loc['start'] = np.random.rand(n)
sample_df.loc['null'] = np.random.rand(n)
sample_df.loc['conv'] = np.random.rand(n)
sample_df["start"] = 0.0
sample_df["null"] = np.random.rand(sample_df.shape[0])
sample_df["conv"] = np.random.rand(sample_df.shape[0])
sample_df.index.name = 'from'

from pyspark.sql.types import StringType
from pyspark.sql import functions as F

channels = [channel for channel in sample_df.columns if channel not in ['start', 'null', 'conv']]
channels_df = spark.createDataFrame(channels, StringType()).toDF(*['channel'])

from pyspark.sql.functions import udf

@udf("float")
def removal_effects_udf(channel):
  global sample_df

  conversion_rate=0.0313
  removal_df = sample_df.drop(channel, axis=1).drop(channel, axis=0)
  row_sum = pd.DataFrame(float(1) - removal_df.sum(axis=1), columns = ["value"])
  null_pct = row_sum[row_sum['value']!=0].reset_index()
  null_pct.set_index('from', inplace=True)
  removal_df['null']  = removal_df.index.to_series().map(null_pct['value']).fillna(removal_df['null'])
  removal_df.loc['null']['null'] = 1.0
  removal_to_conv = removal_df[['null', 'conv']].drop(['null', 'conv'], axis=0)
  removal_to_non_conv = removal_df.drop(['null', 'conv'], axis=1).drop(['null', 'conv'], axis=0)
  removal_inv_diff = np.linalg.inv(np.identity(len(removal_to_non_conv.columns)) - np.asarray(removal_to_non_conv))
  removal_dot_prod = np.dot(removal_inv_diff, np.asarray(removal_to_conv))
  removal_cvr = pd.DataFrame(removal_dot_prod, index=removal_to_conv.index)[[1]].loc['start'].values[0]
  removal_effect = 1 - removal_cvr / conversion_rate
  return float(removal_effect)

channels_df = channels_df.withColumn("removal_effect", removal_effects_udf(F.col("channel"))).toPandas()
channels_df_pandas = channels_df.toPandas()

完成此操作后,出现以下错误:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<command-1959014423699154> in <module>
----> 1 channels_df = channels_df.withColumn("removal_effect", removal_effects_udf(F.col("channel"))).toPandas()

/databricks/spark/python/pyspark/sql/pandas/conversion.py in toPandas(self)
    106                     # Rename columns to avoid duplicated column names.
    107                     tmp_column_names = ['col_{}'.format(i) for i in range(len(self.columns))]
--> 108                     batches = self.toDF(*tmp_column_names)._collect_as_arrow()
    109                     if len(batches) > 0:
    110                         table = pyarrow.Table.from_batches(batches)

/databricks/spark/python/pyspark/sql/pandas/conversion.py in _collect_as_arrow(self)
    244         finally:
    245             # Join serving thread and raise any exceptions from collectAsArrowToPython
--> 246             jsocket_auth_server.getResult()
    247 
    248         # Separate RecordBatches from batch order indices in results

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a,**kw)
    126     def deco(*a,**kw):
    127         try:
--> 128             return f(*a,**kw)
    129         except py4j.protocol.Py4JJavaError as e:
    130             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o32146.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:431)
    at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:98)
    at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:94)
    at sun.reflect.GeneratedMethodAccessor697.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 160 in stage 1221.0 failed 4 times, most recent failure: Lost task 160.3 in stage 1221.0 (TID 161215, 10.0.1.18, executor 582): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:618)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:607)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:538)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:731)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.hasNext(ArrowConverters.scala:117)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
    at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
    at scala.collection.AbstractIterator.to(Iterator.scala:1429)
    at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
    at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$7(Dataset.scala:3633)
    at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2401)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
    at org.apache.spark.scheduler.Task.run(Task.scala:117)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:639)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:642)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:71)
    ... 38 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2478)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2427)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2426)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2426)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1131)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1131)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1131)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2678)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2625)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2613)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:917)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2307)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2402)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$6(Dataset.scala:3631)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$3(Dataset.scala:3635)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$3$adapted(Dataset.scala:3612)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3689)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:115)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:247)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:100)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:828)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:76)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:197)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3687)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:3612)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:3611)
    at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$2(SocketAuthServer.scala:144)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
    at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1(SocketAuthServer.scala:146)
    at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1$adapted(SocketAuthServer.scala:141)
    at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:115)
    at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:108)
    at org.apache.spark.security.SocketAuthServer$$anon$1.$anonfun$run$1(SocketAuthServer.scala:62)
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:62)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:618)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:607)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:538)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:731)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.hasNext(ArrowConverters.scala:117)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
    at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
    at scala.collection.AbstractIterator.to(Iterator.scala:1429)
    at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
    at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$7(Dataset.scala:3633)
    at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2401)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
    at org.apache.spark.scheduler.Task.run(Task.scala:117)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:639)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:642)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:71)
    ... 38 more

如何解决这个问题?
我使用的是databricks,集群有488个核| 1.75tb | spark 3.0.0
编辑:
对于user@wwnde提供的解决方案,我仍然得到一个错误:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<command-1959014423698939> in <module>
      5 
      6 spark.conf.set("spark.sql.execution.arrow.enabled", "true")
----> 7 channels_df_pandas = channels_df.select("*").toPandas()

/databricks/spark/python/pyspark/sql/pandas/conversion.py in toPandas(self)
    106                     # Rename columns to avoid duplicated column names.
    107                     tmp_column_names = ['col_{}'.format(i) for i in range(len(self.columns))]
--> 108                     batches = self.toDF(*tmp_column_names)._collect_as_arrow()
    109                     if len(batches) > 0:
    110                         table = pyarrow.Table.from_batches(batches)

/databricks/spark/python/pyspark/sql/pandas/conversion.py in _collect_as_arrow(self)
    244         finally:
    245             # Join serving thread and raise any exceptions from collectAsArrowToPython
--> 246             jsocket_auth_server.getResult()
    247 
    248         # Separate RecordBatches from batch order indices in results

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a,**kw)
    126     def deco(*a,**kw):
    127         try:
--> 128             return f(*a,**kw)
    129         except py4j.protocol.Py4JJavaError as e:
    130             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o32697.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:431)
    at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:98)
    at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:94)
    at sun.reflect.GeneratedMethodAccessor697.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 1223.0 failed 4 times, most recent failure: Lost task 7.3 in stage 1223.0 (TID 161666, 10.0.1.26, executor 597): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 644, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/databricks/spark/python/pyspark/worker.py", line 463, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
  File "/databricks/spark/python/pyspark/worker.py", line 254, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/databricks/spark/python/pyspark/worker.py", line 76, in read_command
    command = serializer.loads(command.value)
  File "/databricks/spark/python/pyspark/broadcast.py", line 154, in value
    self._value = self.load_from_path(self._path)
  File "/databricks/spark/python/pyspark/broadcast.py", line 131, in load_from_path
    return self.load(f)
  File "/databricks/spark/python/pyspark/broadcast.py", line 137, in load
    return pickle.load(file)
OSError: [Errno 12] Cannot allocate memory

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:585)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:538)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:731)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.hasNext(ArrowConverters.scala:117)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
    at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
    at scala.collection.AbstractIterator.to(Iterator.scala:1429)
    at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
    at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$7(Dataset.scala:3633)
    at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2401)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
    at org.apache.spark.scheduler.Task.run(Task.scala:117)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:639)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:642)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2478)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2427)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2426)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2426)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1131)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1131)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1131)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2678)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2625)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2613)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:917)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2307)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2402)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$6(Dataset.scala:3631)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$3(Dataset.scala:3635)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$3$adapted(Dataset.scala:3612)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3689)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:115)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:247)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:100)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:828)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:76)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:197)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3687)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:3612)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:3611)
    at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$2(SocketAuthServer.scala:144)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
    at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1(SocketAuthServer.scala:146)
    at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1$adapted(SocketAuthServer.scala:141)

    at java.lang.Thread.run(Thread.java:748)
hrysbysz

hrysbysz1#

import numpy as np
import pandas as pd

# Enable Arrow-based columnar data transfers

spark.conf.set("spark.sql.execution.arrow.enabled", "true") 
channels_df_pandas = channels_df.select("*").toPandas()
0g0grzrc

0g0grzrc2#

代码的主要问题是 toPandas 有效地将所有数据带到驱动程序节点的函数—集群中的内存和内核总量与此无关—驱动程序节点大小是主要瓶颈(当然可以增加驱动程序节点大小)。我还看到您引用了来自udf的全局变量-理论上它应该被广播,但这仍然是一个糟糕的做法。
要真正解决这个问题,您需要修改您的方法,使您的代码完全分布:
摆脱 toPandas -最好将结果写在某个地方,然后以其他方式访问它们—看起来您的数据太多了。
不喜欢使用全局变量
此外,建议使用速度更快的Pandas自定义项,而不是“普通自定义项”。

相关问题