如何避免java.io.StreamCorruptedException:无效的数据流标头:204356EC与PySpark?配合使用时的情况

cqoc49vn  于 2022-12-03  发布在  Spark
关注(0)|答案(2)|浏览(382)

每当我尝试使用PySpark读取Spark数据集并将其转换为Pandas df进行建模时,我都会收到以下错误:在toPandas()步骤上执行java.io.StreamCorruptedException: invalid stream header: 204356EC
我不是一个Java程序员(因此PySpark),所以这些错误对我来说可能很神秘。我尝试了以下几种方法,但仍然存在这个问题:

以下测试脚本中的日志记录验证Spark和PySpark版本是否一致。
test.py:

import logging

from pyspark.sql import SparkSession
from pyspark import SparkContext

import findspark
findspark.init()

logging.basicConfig(
    format='%(asctime)s %(levelname)-8s %(message)s',
    level=logging.INFO,
    datefmt='%Y-%m-%d %H:%M:%S')

sc = SparkContext('local[*]', 'test')
spark = SparkSession(sc)
logging.info('Spark location: {}'.format(findspark.find()))
logging.info('PySpark version: {}'.format(spark.sparkContext.version))

logging.info('Reading spark input dataframe')
test_df = spark.read.csv('./data', header=True, sep='|', inferSchema=True)

logging.info('Converting spark DF to pandas DF')
pandas_df = test_df.toPandas()
logging.info('DF record count: {}'.format(len(pandas_df)))
sc.stop()

输出量:

$ python ./test.py   
21/05/13 11:54:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2021-05-13 11:54:34 INFO     Spark location: /Users/username/server/spark-3.1.1-bin-hadoop2.7
2021-05-13 11:54:34 INFO     PySpark version: 3.1.1
2021-05-13 11:54:34 INFO     Reading spark input dataframe
2021-05-13 11:54:42 INFO     Converting spark DF to pandas DF                   
21/05/13 11:54:42 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
21/05/13 11:54:45 ERROR TaskResultGetter: Exception while getting task result12]
java.io.StreamCorruptedException: invalid stream header: 204356EC
    at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:936)
    at java.io.ObjectInputStream.<init>(ObjectInputStream.java:394)
    at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:64)
    at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:64)
    at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:123)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:108)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$3.$anonfun$run$1(TaskResultGetter.scala:97)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:63)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Traceback (most recent call last):
  File "./test.py", line 23, in <module>
    pandas_df = test_df.toPandas()
  File "/Users/username/server/spark-3.1.1-bin-hadoop2.7/python/pyspark/sql/pandas/conversion.py", line 141, in toPandas
    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "/Users/username/server/spark-3.1.1-bin-hadoop2.7/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/Users/username/server/spark-3.1.1-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
  File "/Users/username/server/spark-3.1.1-bin-hadoop2.7/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/Users/username/server/spark-3.1.1-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o31.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: java.io.StreamCorruptedException: invalid stream header: 204356EC
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:390)
    at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3519)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
    at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3516)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
ldxq2e6h

ldxq2e6h1#

通过确保序列化选项(注册在spark.serlializer下的配置中)与pyarrow(通常在panda到pyspark的转换过程中使用,反之亦然,如果你启用了它)不兼容,这个问题就为我解决了。
修复方法是从配置中删除经常推荐的spark.serializer: org.apache.spark.serializer.KryoSerializer,转而依赖可能较慢的默认值。
对于上下文,我们的设置是使用数据库Spark集群的ML版本(v7.3)。

sr4lhrrt

sr4lhrrt2#

我在Spark Thrift server中遇到了这个异常。
驱动程序版本与群集版本不同.
在我的情况下,我删除了这一点,使用版本从驱动程序在所有集群。

spark.yarn.archive=hdfs:///spark/3.1.1.zip

相关问题