Pyspark GCP不支持操作异常:org.apache.parquet.列.值.字典.纯值字典$纯双字典

hrysbysz  于 2023-01-17  发布在  Apache
关注(0)|答案(1)|浏览(158)

我是新来的pyspark,所以希望有人能帮助。我试图读取存储在GCP桶上的 parquet 文件。文件按日期分区,所以例如bucket-name/year={}/month={}/day={}
对于给定的文件,我们有以下模式描述:
1.在三月份之前,我们使用float数据类型的列x和y
1.自3月以来,这2列现在采用double数据类型
从我所看到的pyspark在评估float和double数据类型是兼容的数据类型. * 方面没有问题(我在网上找到的这个错误的类似例子与不兼容的数据类型有关,例如String和float)* 然而,我们面临着这个奇怪的问题,如果我们试图读取这个文件的所有可用数据:

#i.e. read all the data we have ever received for this file
 path = 'bucket-name/year=*/month=*/day=*' 

df = spark.read.format('parquet').load(path)
df.cache().count()

我们得到下面的错误.(请注意,我们没有得到这个错误,如果我们做df.count(),只有面对如果我们缓存第一)
此外,www.example.com的结果模式spark.read提到了列x的数据类型为float,因此,在模式方面,spark很乐意读入数据并说dtype为float,但是,如果我们缓存,事情就变糟了。
希望情况的细节足够清楚:)

An error occurred while calling o923.count. :
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 15 in stage 41.0 failed 4 times, most recent failure: Lost task
15.3 in stage 41.0 (TID 13228, avroconversion-validation-w-1.c.vf-gned-nwp-live.internal, executor
47): java.lang.UnsupportedOperationException:
org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary
    at
org.apache.parquet.column.Dictionary.decodeToFloat(Dictionary.java:53)
    at
org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToFloat(ParquetDictionary.java:41)
    at
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getFloat(OnHeapColumnVector.java:423)
    at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)     at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
    at
org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anon$1.hasNext(InMemoryRelation.scala:125)
    at
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
    at
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
    at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1165)
    at
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
    at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
    at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)     at
org.apache.spark.rdd.RDD.iterator(RDD.scala:308)    at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)     at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)     at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:359)
    at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1165)
    at
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
    at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
    at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)     at
org.apache.spark.rdd.RDD.iterator(RDD.scala:308)    at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)     at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)     at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)     at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)     at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)  at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

5f0d552i

5f0d552i1#

根据文件:
该高速缓存()方法是使用默认存储级别的简写,默认存储级别是StorageLevel.MEMORY_ONLY(将反序列化的对象存储在内存中)
cache()是一个惰性操作,如果您查看MEMORY_ONLY部分,您会注意到cache()尝试将RDD/DataFrame作为反序列化的Java对象存储在JVM中[一旦您对缓存的RDD/DataFrame调用操作],因此您在RDD/DataFrame中的对象反序列化时会遇到问题。反序列化工作得好不好
如果您调用df.count()而没有在dfSpark中进行任何转换,那么Spark不会反序列化您对象

相关问题