s3开始使用ApacheSpark返回过早的内容长度结束错误,同时在旧的ec2示例上正常工作

9wbgstp7  于 2021-05-18  发布在  Spark
关注(0)|答案(2)|浏览(557)

我们使用s3 bucket作为数据集的数据存储,其中数据存储为Parquet文件。
我们正在awsr6g示例上运行apachespark,这些示例使用hydrosphermist作为代理来启动spark示例中的作业。我们使用本地集群模式,因此每个示例都在docker容器中运行spark workers。4天前,我们的一个ec2示例在从bucket中读取Parquet文件时突然出现了内容长度结束错误,而另一个示例读取的结果很好。然而,第二天,两个正在运行的示例都开始出现这个错误。重新创建它们也没有什么帮助,问题并没有消失,而是发生在每次从s3存储桶读取数据的作业运行中。
误差如下:

Error: RSocket error 0x201 (APPLICATION_ERROR): org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 1977131; received: 849
    at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
    at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:198)
    at org.apache.http.impl.io.ContentLengthInputStream.close(ContentLengthInputStream.java:101)
    at org.apache.http.conn.BasicManagedEntity.streamClosed(BasicManagedEntity.java:166)
    at org.apache.http.conn.EofSensorInputStream.checkClose(EofSensorInputStream.java:228)
    at org.apache.http.conn.EofSensorInputStream.close(EofSensorInputStream.java:172)
    at java.io.FilterInputStream.close(FilterInputStream.java:181)
    at java.io.FilterInputStream.close(FilterInputStream.java:181)
    at java.io.FilterInputStream.close(FilterInputStream.java:181)
    at java.io.FilterInputStream.close(FilterInputStream.java:181)
    at com.amazonaws.services.s3.model.S3ObjectInputStream.abort(S3ObjectInputStream.java:90)
    at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:72)
    at org.apache.hadoop.fs.s3a.S3AInputStream.seek(S3AInputStream.java:115)
    at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:62)
    at org.apache.parquet.hadoop.util.H1SeekableInputStream.seek(H1SeekableInputStream.java:46)
    at org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1157)
    at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:805)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:301)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:256)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:159)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:107)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:105)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:823)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:823)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我们使用的版本:
Spark-2.4.4
水圈雾-1.1.3
hadoop-2.7版
java aws sdk-1.7.4(hadoop 2.7使用相同版本)
jvm-1.8版本
我们使用 s3a:// 读取s3 bucket并使用 org.apache.hadoop.fs.s3a.S3AFileSystem s3a实现。
我认为其中一个可能的问题可能是javaawssdk版本,因为1.7.4是一个非常旧的版本,现在支持它,但是spark给出了2.7作为它的主要hadoop版本,所以这不应该是问题所在。
数据集本身非常小,目前都在10mb以下。
有人遇到过这个问题吗?
edit1:它失败的内容长度往往是相同的,它们不是随机的。所以总是849,7744,664或8192。这取决于具体的工作。从Parquet文件中删除压缩也将最小的数字从696更改为849。

dced5bon

dced5bon1#

编辑:不固定。问题依然存在。然而,事实证明,在局部运行Spark和雾不会产生这个问题。
较新的hadoop2.7版本实际上没有使用awsjavasdk1.7.4。更新到一个更新的spark版本,这个版本附带了一个最近构建的hadoop修复了这个问题。我们现在使用的是spark 3.0.1和hadoop 2.7.4。
另一个问题是,我们同时包含了org.apache。hadoop:hadoop-aws and aws java sdk依赖项。第二个可以安全地移除。在那之后事情又顺利地进行了

whhtz7ly

whhtz7ly2#

这个问题是由于hadoop2.7造成的。升级到hadoop3.2之后,这个问题就解决了。还要注意其他人,以确保您没有冲突的guava依赖(应该是版本27 jre,虽然我完全删除了这个依赖,因为我不需要它)或错误的hadoop aws版本。hadoop aws依赖版本必须是3.2,不能是3.2.1或其他版本。

相关问题