pyspark Pysaprk尝试保存表,从AW s3得到403

pinkon5k  于 12个月前  发布在  Spark
关注(0)|答案(1)|浏览(116)

我试图保存一个Pyspark DF到S3,并得到403:我试图了解问题是来自代码端还是来自AWS内部的设置。
这就是错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o87.saveAsTable. : java.util.concurrent.ExecutionException: java.nio.file.AccessDeniedException: s3a://shay-test-bucket/_delta_log: getFileStatus on s3a://shay-test-bucket/_delta_log: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 2Y4EWPC0WM1MW02E; S3 Extended Request ID: ruRAYWkPXzvM29htY10aKa4Ah4XXmawmaPVEMXFTxr0wHHT5n+wVFhkQstgIErz7pxFF8hmdQI0=; Proxy: webproxy.com), S3 Extended Request ID: ruRAYWkPXzvM29htY10aKa4Ah4XXmawmaPVEMXFTxr0wHHT5n+wVFhkQstgIErz7pxFF8hmdQI0=:403 Forbidden at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306) at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293) at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135) at com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410) at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2380) at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257) at com.google.common.cache.LocalCache.get(LocalCache.java:4000) at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789) at org.apache.spark.sql.delta.DeltaLog$.getDeltaLogFromCache$1(DeltaLog.scala:801) at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:811) at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:715) at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:653) at org.apache.spark.sql.delta.catalog.DeltaCatalog.$anonfun$createDeltaTable$8(DeltaCatalog.scala:152) at scala.Option.map(Option.scala:230) at org.apache.spark.sql.delta.catalog.DeltaCatalog.$anonfun$createDeltaTable$1(DeltaCatalog.scala:150) at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140) at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138) at org.apache.spark.sql.delta.catalog.DeltaCatalog.recordFrameProfile(DeltaCatalog.scala:57) at org.apache.spark.sql.delta.catalog.DeltaCatalog.org$apache$spark$sql$delta$catalog$DeltaCatalog$$createDeltaTable(DeltaCatalog.scala:87) at org.apache.spark.sql.delta.catalog.DeltaCatalog$StagedDeltaTableV2.$anonfun$commitStagedChanges$1(DeltaCatalog.scala:489) at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140) at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138) at org.apache.spark.sql.delta.catalog.DeltaCatalog.recordFrameProfile(DeltaCatalog.scala:57) at org.apache.spark.sql.delta.catalog.DeltaCatalog$StagedDeltaTableV2.commitStagedChanges(DeltaCatalog.scala:451) at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.$anonfun$writeToTable$1(WriteToDataSourceV2Exec.scala:603) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1563) at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.writeToTable(WriteToDataSourceV2Exec.scala:587) at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.writeToTable$(WriteToDataSourceV2Exec.scala:582) at org.apache.spark.sql.execution.datasources.v2.AtomicReplaceTableAsSelectExec.writeToTable(WriteToDataSourceV2Exec.scala:202) at org.apache.spark.sql.execution.datasources.v2.AtomicReplaceTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:236) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856) at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:633) at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:567) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:748)

这是 Dataframe 保存模式

df.write.format("delta").mode('overwrite').saveAsTable("table1", path=f"s3a://{bucket.name}/")

字符串
这是Hadoop pyspark的初始化

return (
            SparkConf()
            .setAppName("MY_APP")
            .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
            .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
            .set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider,"
                                                    "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider,"
                                                    "org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider")
            .set("fs.s3a.access.key", access_key)
            .set("fs.s3a.secret.key", secret_key)
            .set("fs.s3a.secret.token", token)
            .set("fs.s3a.endpoint", end_point)
            .set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        )

igetnqfo

igetnqfo1#

您在AWS S3上遇到的错误(403 Forbidden)。

临时解决办法

.set("spark.hadoop.fs.s3a.impl.disable.cache", "true")
.set("spark.hadoop.fs.s3a.aws.credentials.provider", "your.credentials.provider.class")
.set("spark.hadoop.fs.s3a.access.key", "your-access-key")
.set("spark.hadoop.fs.s3a.secret.key", "your-secret-key")
.set("spark.hadoop.fs.s3a.endpoint", "your-s3-endpoint")
.set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
.set("spark.hadoop.fs.s3a.attempts.maximum", "3")
.set("spark.hadoop.fs.s3a.fast.upload", "true")
.set("spark.hadoop.fs.s3a.path.style.access", "true")
.set("spark.hadoop.fs.s3a.connection.timeout", "2000")
.set("spark.hadoop.fs.s3a.connection.maximum", "100")
.set("spark.hadoop.fs.s3a.socket.timeout", "2000")
.set("spark.hadoop.fs.s3a.experimental.input.fadvise", "sequential")
.set("spark.hadoop.fs.s3a.multipart.size", "104857600")
.set("spark.hadoop.fs.s3a.buffer.dir", "/mnt/sparkbuffer")
.set("spark.hadoop.fs.s3a.multipart.threshold", "2147483647")
.set("spark.hadoop.fs.s3a.threads.max", "25")
.set("spark.hadoop.fs.s3a.threads.core", "5")
.set("spark.hadoop.fs.s3a.threads.keepalivetime", "60")
.set("spark.hadoop.fs.s3a.multipart.purge", "false")
.set("spark.hadoop.fs.s3a.threads.daemon", "true")
.set("spark.hadoop.fs.s3a.fast.upload.buffer", "bytebuffer")
.set("spark.hadoop.fs.s3a.committer.name", "magic")
.set("spark.hadoop.fs.s3a.threads.max", "5")
.set("spark.hadoop.fs.s3a.fast.upload.active.blocks", "4")
.set("spark.hadoop.fs.s3a.fast.upload.active.blocks.threshold", "16")
.set("spark.hadoop.fs.s3a.fast.upload.buffer", "bytebuffer")
.set("spark.hadoop.fs.s3a.fast.upload.active.blocks", "4")
.set("spark.hadoop.fs.s3a.fast.upload.active.blocks.threshold", "16")
.set("spark.hadoop.fs.s3a.fast.upload.retry.count", "5")
.set("spark.hadoop.fs.s3a.fast.upload.retry.interval", "5")
.set("spark.hadoop.fs.s3a.multipart.purge", "false")
.set("spark.hadoop.fs.s3a.multipart.purge.frequency", "300")
.set("spark.hadoop.fs.s3a.multipart.size", "104857600")
.set("spark.hadoop.fs.s3a.multipart.threshold", "2147483647")
.set("spark.hadoop.fs.s

字符串

确保Spark应用配置

确保您的Spark应用程序已正确配置为使用提供的AWS凭据,并且端点和其他配置与您的AWS环境匹配。

.set("fs.s3a.endpoint", end_point)

相关问题