spark允许为dynamodb使用amazon假定的角色和sts临时凭证吗?

oxosxuxt  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(513)

我需要使用java从带有spark的dynamodb表中获取数据。它可以与用户的访问密钥和密钥配合使用:

final JobConf jobConf = new JobConf(sc.hadoopConfiguration());
jobConf.set("dynamodb.servicename", "dynamodb");
jobConf.set("dynamodb.input.tableName", tableName);
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat");
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat");
jobConf.set("dynamodb.awsAccessKeyId",  accessKey);
jobConf.set("dynamodb.awsSecretAccessKey", secretKey);
jobConf.set("dynamodb.endpoint", endpoint);

我需要使用aws假定的角色和sts(至少出于安全原因)从dynamodb中完全使用spark获取数据。有可能吗?我发现可以使用假定的角色访问带有spark的awss3(https://issues.apache.org/jira/browse/hadoop-12537, https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html),但对于dynamodb还没有发现类似的想法。
对于接收sts临时凭据,我使用以下代码:

AWSSecurityTokenService stsClient = AWSSecurityTokenServiceClientBuilder.defaultClient();
AssumeRoleRequest assumeRequest = new AssumeRoleRequest()
        .withRoleArn(roleArn)  // arn:aws:iam::XXXXXXX:role/assume-role-DynamoDB-ReadOnly
        .withDurationSeconds(3600)
        .withRoleSessionName("assumed-role-session");
AssumeRoleResult assumeResult = stsClient.assumeRole(assumeRequest);
Credentials credentials = assumeResult.getCredentials();

调用 credentials.getAccessKeyId(), credentials.getSecretAccessKey() and credentials.getSessionToken() 返回生成的临时凭据。有了这些凭证,我可以使用javaawsdk amazondynamodbclient(非spark方法)成功地从dynamodb获取数据。
Spark有可能吗?spark是否允许使用以下内容: jobConf.set("dynamodb.awsSessionToken”, sessionToken) ?

dwbf0jvd

dwbf0jvd1#

通过查看代码,您可能可以将dynamodb.customawscredentialsprovider与com.amazonaws.auth.stsassumerolesessioncredentialsprovider示例一起使用,以获得所需的工作。
https://github.com/awslabs/emr-dynamodb-connector/blob/master/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/dynamodbconstants.java#l30
https://docs.aws.amazon.com/awsjavasdk/latest/javadoc/com/amazonaws/auth/stsassumerolesessioncredentialsprovider.html
编辑:所以这比我最初想象的要难一点。我最终在stsassumerolesessioncredentialsprovider周围实现了自己的 Package 器。

package foo.bar;

import com.amazonaws.auth.AWSSessionCredentials;
import com.amazonaws.auth.AWSSessionCredentialsProvider;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;

public class HadoopSTSAssumeRoleSessionCredentialsProvider
        implements AWSSessionCredentialsProvider, Configurable {

    private static final String ROLE_ARN_CONF = "assumed.creds.role.arn";
    private static final String SESSION_NAME_CONF = "assumed.creds.session.name";

    private Configuration configuration;
    private STSAssumeRoleSessionCredentialsProvider delegate;

    public AWSSessionCredentials getCredentials() {
        return delegate.getCredentials();
    }

    public void refresh() {
        delegate.refresh();
    }

    public void setConf(Configuration configuration) {
        this.configuration = configuration;
        String roleArn = configuration.get(ROLE_ARN_CONF);
        String sessionName = configuration.get(SESSION_NAME_CONF);

        if (roleArn == null || roleArn.isEmpty() || sessionName == null || sessionName.isEmpty()) {
            throw new IllegalStateException("Please set " + ROLE_ARN_CONF + " and "
                    + SESSION_NAME_CONF + " before use.");
        }
        delegate = new STSAssumeRoleSessionCredentialsProvider.Builder(
                roleArn, sessionName).build();
    }

    public Configuration getConf() {
        return configuration;
    }
}

然后你可以这样使用它:

val ddbConf: JobConf = new JobConf(sc.hadoopConfiguration)

ddbConf.set("dynamodb.customAWSCredentialsProvider",
    "foo.bar.HadoopSTSAssumeRoleSessionCredentialsProvider")
ddbConf.set("assumed.creds.role.arn", "roleArn")
ddbConf.set("assumed.creds.session.name", "sessionName")

相关问题