spark对s3的写入偶尔会失败一次

lnxxn5zx  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(387)

很好的一天,
我有一个spark任务,从s3读取一些数据,对其进行处理,然后将结果写回s3。代码如下所示:

val spark = SparkSession
  .builder()
  .appName("AppName")
  .getOrCreate()

spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", s3AccessKey)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", s3SecretKey)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.session.token" , s3SessionToken)

spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = spark.sparkContext

val inputSchema = StructType(
  Array(
    StructField(fieldName, StringType, nullable = true)
  )
)

var inputData = spark.read
    .format("csv")
    .schema(inputSchema)
    .load(inputPath)

// Process data and store results in finalDf

finalDf.write
      .format("com.databricks.spark.csv")
      .option("compression","gzip")
      .option("delimiter",fileDelimiter)
      .option("header", "false")
      .mode("append")
      .save(outputPath)

代码大部分时间都是有效的,但是如果在s3上编写结果的过程中失败(当已经在s3上完成并编写了600/800个分区时),则偶尔会出现以下异常:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 654 in stage 8.0 failed 4 times, most recent failure: Lost task 654.3 in stage 8.0 (TID 1718, 10.61.61.105, executor 1): org.apache.hadoop.fs.s3a.AWSBadRequestException: getFileStatus on s3a://bucket_name/folder_name/_temporary/0/_temporary/attempt_20210422190715_0008_m_000654_1718/part-00654-7edcb33f-a042-456d-92ce-52db9cb3b81e-c000.csv.gz: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8VP3D75447NY14G5; S3 Extended Request ID: IOoLbK54y2k874ZQgvH90H5vVisq47HojfqKhLFdB0TyrO0+zRPcgSCcNj0VjMjk/zT1ggQvTb0=), S3 Extended Request ID: IOoLbK54y2k874ZQgvH90H5vVisq47HojfqKhLFdB0TyrO0+zRPcgSCcNj0VjMjk/zT1ggQvTb0=:400 Bad Request: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8VP3D75447NY14G5; S3 Extended Request ID: IOoLbK54y2k874ZQgvH90H5vVisq47HojfqKhLFdB0TyrO0+zRPcgSCcNj0VjMjk/zT1ggQvTb0=)
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:212)
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:145)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2184)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:749)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:987)
    at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
    at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
    at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CSVFileFormat.scala:177)
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:85)
    at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
    at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:236)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8VP3D75447NY14G5; S3 Extended Request ID: IOoLbK54y2k874ZQgvH90H5vVisq47HojfqKhLFdB0TyrO0+zRPcgSCcNj0VjMjk/zT1ggQvTb0=), S3 Extended Request ID: IOoLbK54y2k874ZQgvH90H5vVisq47HojfqKhLFdB0TyrO0+zRPcgSCcNj0VjMjk/zT1ggQvTb0=
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
    at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1264)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$4(S3AFileSystem.java:1235)
    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:285)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:1232)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2169)
    ... 23 more

有什么我可能遗漏的吗?会话令牌是否有超时时间?这是我能想到的唯一可能导致这个问题的原因。
谢谢

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题