很好的一天,
我有一个spark任务,从s3读取一些数据,对其进行处理,然后将结果写回s3。代码如下所示:
val spark = SparkSession
.builder()
.appName("AppName")
.getOrCreate()
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", s3AccessKey)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", s3SecretKey)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.session.token" , s3SessionToken)
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = spark.sparkContext
val inputSchema = StructType(
Array(
StructField(fieldName, StringType, nullable = true)
)
)
var inputData = spark.read
.format("csv")
.schema(inputSchema)
.load(inputPath)
// Process data and store results in finalDf
finalDf.write
.format("com.databricks.spark.csv")
.option("compression","gzip")
.option("delimiter",fileDelimiter)
.option("header", "false")
.mode("append")
.save(outputPath)
代码大部分时间都是有效的,但是如果在s3上编写结果的过程中失败(当已经在s3上完成并编写了600/800个分区时),则偶尔会出现以下异常:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 654 in stage 8.0 failed 4 times, most recent failure: Lost task 654.3 in stage 8.0 (TID 1718, 10.61.61.105, executor 1): org.apache.hadoop.fs.s3a.AWSBadRequestException: getFileStatus on s3a://bucket_name/folder_name/_temporary/0/_temporary/attempt_20210422190715_0008_m_000654_1718/part-00654-7edcb33f-a042-456d-92ce-52db9cb3b81e-c000.csv.gz: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8VP3D75447NY14G5; S3 Extended Request ID: IOoLbK54y2k874ZQgvH90H5vVisq47HojfqKhLFdB0TyrO0+zRPcgSCcNj0VjMjk/zT1ggQvTb0=), S3 Extended Request ID: IOoLbK54y2k874ZQgvH90H5vVisq47HojfqKhLFdB0TyrO0+zRPcgSCcNj0VjMjk/zT1ggQvTb0=:400 Bad Request: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8VP3D75447NY14G5; S3 Extended Request ID: IOoLbK54y2k874ZQgvH90H5vVisq47HojfqKhLFdB0TyrO0+zRPcgSCcNj0VjMjk/zT1ggQvTb0=)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:212)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:145)
at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2184)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:749)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:987)
at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CSVFileFormat.scala:177)
at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:85)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:236)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8VP3D75447NY14G5; S3 Extended Request ID: IOoLbK54y2k874ZQgvH90H5vVisq47HojfqKhLFdB0TyrO0+zRPcgSCcNj0VjMjk/zT1ggQvTb0=), S3 Extended Request ID: IOoLbK54y2k874ZQgvH90H5vVisq47HojfqKhLFdB0TyrO0+zRPcgSCcNj0VjMjk/zT1ggQvTb0=
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1264)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$4(S3AFileSystem.java:1235)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:285)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:1232)
at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2169)
... 23 more
有什么我可能遗漏的吗?会话令牌是否有超时时间?这是我能想到的唯一可能导致这个问题的原因。
谢谢
暂无答案!
目前还没有任何答案,快来回答吧!