Apache Spark EMR 5.28无法在s3上加载 parquet 文件

k3bvogb1  于 2023-01-21  发布在  Apache
关注(0)|答案(3)|浏览(195)

在EMR集群5.28.0上,从s3读取parquet文件失败,出现以下异常,而在EMR 5.18.0上,同样的操作正常。
我甚至从spark-shell开始尝试:

sqlContext.read.load(("s3://s3_file_path/*")
df.take(5)

但失败,出现相同的例外:

Job aborted due to stage failure: Task 3 in stage 1.0 failed 4 times, most recent failure: Lost task 3.3 in stage 1.0 (TID 17, ip-x.x.x.x.ec2.internal, executor 1): **org.apache.spark.sql.execution.datasources.FileDownloadException: Failed to download file path: s3://somedir/somesubdir/434560/1658_1564419581.parquet, range: 0-7928, partition values: [empty row], isDataPresent: false**
    at org.apache.spark.sql.execution.datasources.AsyncFileDownloader.next(AsyncFileDownloader.scala:142)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.getNextFile(FileScanRDD.scala:241)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:171)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:130)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
**Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.org$apache$spark$sql$execution$datasources$parquet$ParquetFileFormat$$isCreatedByParquetMr(ParquetFileFormat.scala:352)
    at** org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildPrefetcherWithPartitionValues$1.apply(ParquetFileFormat.scala:676)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildPrefetcherWithPartitionValues$1.apply(ParquetFileFormat.scala:579)
    at org.apache.spark.sql.execution.datasources.AsyncFileDownloader.org$apache$spark$sql$execution$datasources$AsyncFileDownloader$$downloadFile(AsyncFileDownloader.scala:93)
    at org.apache.spark.sql.execution.datasources.AsyncFileDownloader$$anonfun$initiateFilesDownload$2$$anon$1.call(AsyncFileDownloader.scala:73)
    at org.apache.spark.sql.execution.datasources.AsyncFileDownloader$$anonfun$initiateFilesDownload$2$$anon$1.call(AsyncFileDownloader.scala:72)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    ... 3 more

我找不到这个文档。有没有人在EMR 5. 28. 0上遇到过这个问题,并且能够修复这个问题?
在5.28上,我可以读取EMR写入s3的文件,但阅读parquet-go写入的现有parquet文件会引发上述异常,而它在EMR 5.18上工作正常
更新:在检查 parquet 文件,旧的工作,只有5.18有丢失的统计

creator:            null 
file schema:        parquet-go-root 
timestringhr:        BINARY SNAPPY DO:0 FPO:21015 SZ:1949/25676/13.17 VC:1092 ENC:RLE,BIT_PACKED,PLAIN ST:[no stats for this column]
timeseconds:         INT64 SNAPPY DO:0 FPO:22964 SZ:1397/9064/6.49 VC:1092 ENC:RLE,BIT_PACKED,PLAIN ST:[min: 1564419460, max: 1564419581, num_nulls not defined]

其中,在EMR 5.18和5.28上运行的代码类似于

creator:            parquet-mr version 1.10.0 (build 031a6654009e3b82020012a18434c582bd74c73a) 
extra:              org.apache.spark.sql.parquet.row.metadata = {<schema_here>}    
timestringhr:        BINARY SNAPPY DO:0 FPO:3988 SZ:156/152/0.97 VC:1092 ENC:PLAIN_DICTIONARY,RLE,BIT_PACKED ST:[min: 2019-07-29 16:00:00, max: 2019-07-29 16:00:00, num_nulls: 0]
timeseconds:         INT64 SNAPPY DO:0 FPO:4144 SZ:954/1424/1.49 VC:1092 ENC:PLAIN_DICTIONARY,RLE,BIT_PACKED ST:[min: 1564419460, max: 1564419581, num_nulls: 0]

这可能导致了NullPointerException。发现了一个与parquet-mr相关的问题https://issues.apache.org/jira/browse/PARQUET-1217。我可以尝试在类路径中包含parquet的更新版本,或者在EMR 6 beta上测试,看看是否可以修复这个问题。

cvxl0en2

cvxl0en21#

尝试添加created_by值到页脚。我在Spark中跟踪了一个NPE到页脚/created_by。如果您正在使用xitongsys/parquet-go,请考虑以下事项:

var writer_version = "parquet-go version 1.0"
...

...
pw, err := writer.NewJSONWriter(schemaStr, fw, 4)
pw.Footer.CreatedBy = &writer_version
atmip9wb

atmip9wb2#

请确保您的Parquet文件不包含具有零行的行组。您可能必须在加载文件时使用读取器调试文件。我们在AWS Glue中遇到此问题,获取“0行的非法行组”。
修复:我们使用Parquet.netnuget包,并限制它写入不包含数据的行组。

8ljdwjyq

8ljdwjyq3#

这很可能是由于EMR承担的角色缺少IAM权限,无法访问文件所在的S3位置。

相关问题