我在读s3的数据集。为此,我要 fs.s3a.aws.credentials.provider
至 org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider
. 下面是从s3读取的代码:
def read_dataset(id, path):
aws_tokens = get_tokens(id)
spark._jsc.hadoopConfiguration().set('fs.s3a.access.key', aws_access_key)
spark._jsc.hadoopConfiguration().set('fs.s3a.secret.key', aws_secret_key)
spark._jsc.hadoopConfiguration().set('fs.s3a.session.token', aws_session_token)
spark._jsc.hadoopConfiguration().set('fs.s3a.connection.ssl.enabled', 'true')
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
df = spark.read.parquet(path)
return df
我想读两个数据集,称它们为 d1
以及 d2
. 我使用api获得aws凭据,并且工作正常。我想执行两个数据集的并集。
现在有以下工作:
df1 = read_dataset(file_1_id, file_1_path)
df1.count() #gives output
spark._jsc.hadoopConfiguration().unset('fs.s3a.access.key')
spark._jsc.hadoopConfiguration().unset('fs.s3a.secret.key')
spark._jsc.hadoopConfiguration().unset('fs.s3a.session.token')
spark._jsc.hadoopConfiguration().unset('fs.s3a.aws.credentials.provider')
df2 = read_dataset(file_2_id, file_2_path)
df2.count() #gives output
但是,以下操作不起作用:
df1 = read_dataset(file_1_id, file_1_path)
df1.count() #gives output
_ = df1.persist().count()
spark._jsc.hadoopConfiguration().unset('fs.s3a.access.key')
spark._jsc.hadoopConfiguration().unset('fs.s3a.secret.key')
spark._jsc.hadoopConfiguration().unset('fs.s3a.session.token')
spark._jsc.hadoopConfiguration().unset('fs.s3a.aws.credentials.provider')
df2 = read_dataset(file_2_id, file_2_path) #throws a java.nio.File.AccessDeniedException
我查过了 spark._jsc.hadoopConfiguration()
,我看到 accessKey
, secretKey
以及 sessionToken
设置正确。那我为什么要得到一个 AccessDeniedException
?
另外,在第一种情况下,由于数据集没有持久化,因此无法执行联合。这是因为一旦我更改了aws标记,我就不能再读取第一个数据集了,而且由于spark将懒散地进行计算,所以我得到了一个 AccessDeniedException
.
暂无答案!
目前还没有任何答案,快来回答吧!