我的spark作业从s3存储桶读写,最近我遇到的问题是,作业的最后几个任务就挂在那里,没有抛出错误,只是进展非常缓慢。
e、 g.最后7项任务的完成时间越来越长,你可以看到中位数是28秒,但最后几项任务的最大时间可能高达40分钟。我非常确定数据没有扭曲,每个分区只包含~100mb的数据,但仍然需要40分钟才能完成,如下面的屏幕截图所示。
我的问题是这个问题的潜在原因是什么?这可能是因为s3连接等原因,因为我们以前在nfs上运行spark作业,以前没有这样的问题。我尝试了几乎所有可能找到的配置来帮助解决此问题,但仍然没有成功:(
这是其中一个“挂起”执行器的线程转储副本:
java.base@11.0.10/java.net.SocketInputStream.socketRead0(Native Method)
java.base@11.0.10/java.net.SocketInputStream.socketRead(Unknown Source)
java.base@11.0.10/java.net.SocketInputStream.read(Unknown Source)
java.base@11.0.10/java.net.SocketInputStream.read(Unknown Source)
app//com.amazonaws.thirdparty.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
app//com.amazonaws.thirdparty.apache.http.impl.io.SessionInputBufferImpl.read(SessionInputBufferImpl.java:197)
app//com.amazonaws.thirdparty.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:176)
app//com.amazonaws.thirdparty.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:135)
app//com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
app//com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
app//com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
app//com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
app//com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
app//com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
app//com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
app//com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:107)
app//com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
app//com.amazonaws.services.s3.internal.S3AbortableInputStream.read(S3AbortableInputStream.java:125)
app//com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
app//org.apache.hadoop.fs.s3a.S3AInputStream.lambda$read$3(S3AInputStream.java:468)
app//org.apache.hadoop.fs.s3a.S3AInputStream$$Lambda$1124/0x00007f862e16fd08.execute(Unknown Source)
app//org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
app//org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
app//org.apache.hadoop.fs.s3a.Invoker$$Lambda$954/0x00007f86647c4900.execute(Unknown Source)
app//org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
app//org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
app//org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236)
app//org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:464) => holding Monitor(org.apache.hadoop.fs.s3a.S3AInputStream@1670038891})
java.base@11.0.10/java.io.DataInputStream.read(Unknown Source)
app//org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:102)
app//org.apache.parquet.io.DelegatingSeekableInputStream.readFullyHeapBuffer(DelegatingSeekableInputStream.java:127)
app//org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:91)
app//org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1174)
app//org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:805)
app//org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:313)
app//org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:268)
app//org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:174)
app//org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
app//org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
app//org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173)
app//org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
app//scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
app//scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
app//scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
app//org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:177)
app//org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
app//org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
app//org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
app//org.apache.spark.scheduler.Task.run(Task.scala:127)
app//org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
app//org.apache.spark.executor.Executor$TaskRunner$$Lambda$650/0x00007f86653e2440.apply(Unknown Source)
app//org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
app//org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
java.base@11.0.10/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
java.base@11.0.10/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
java.base@11.0.10/java.lang.Thread.run(Unknown Source)
暂无答案!
目前还没有任何答案,快来回答吧!