从ide运行flink时如何设置presto.s3.属性?

8oomwypt  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(693)

我能够成功地运行我的flink作业,这节省了使用s3的时间 ./bin/flink run ... .
为此,我必须将flink-s3-fs-presto jar复制到我的 $FLINK_HOME/lib 文件夹中,我还必须配置我的s3连接详细信息 flink-conf.yaml :
您需要在flink的flink-conf.yaml中配置s3.access-key和s3.secret-key:

s3.access-key: your-access-key
s3.secret-key: your-secret-key

来源:flink aws docs
我还必须设置一个属性 s3.endpoint 因为我使用的是ibmcloud的s3。
当我使用 ./bin/flink run .
但是,当我尝试从ide(intellij)运行作业时,出现以下错误:
org.apache.flink.runtime.client.jobexecutionexception:无法初始化任务“datasink(textoutputformat(s3://x/folder)-utf-8)”:无法从服务端点加载凭据
我在ide运行作业中设置了一个环境变量, FLINK_CONF_DIR 指向我的flink-conf.yaml,我可以看到我的配置属性被选中了:

11:04:39,487 INFO  org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: s3.access-key,****
11:04:39,487 INFO  org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: s3.secret-key,****
11:04:39,487 INFO  org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: s3.endpoint, s3-api.us-geo.objectstorage.softlayer.net

但是,当我从ide运行时,有一个错误提示这些属性没有传递到presto库:

Caused by: org.apache.flink.fs.s3presto.shaded.com.amazonaws.SdkClientException: Unable to load credentials from service endpoint

另外,为了验证这个理论,如果我在从ide运行时单步执行代码,我可以看到我的endpoint属性没有被应用:

... 深入到hadoop配置中,我可以看到flink配置是一个空Map:

再深入一点,我可以看到 org.apache.flink.core.fs.FileSystem#getUnguardedFileSystem() 正在创建新的空配置:

// this "default" initialization makes sure that the FileSystem class works
        // even when not configured with an explicit Flink configuration, like on
        // JobManager or TaskManager setup
        if (FS_FACTORIES.isEmpty()) {
            initialize(new Configuration());
        }

如何配置 s3.access-key , s3.secret-key 以及 s3.endpoint 从ide运行时的属性?

xriantvc

xriantvc1#

创建core-site.xml

<configuration>
    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>
    <property>
        <name>fs.s3a.access.key</name>
        <value>xxxx</value>
    </property>

    <property>
        <name>fs.s3a.secret.key</name>
        <value>xxxxx</value>
    </property>

</configuration>

并添加

Map par = new HashMap();par.put("fs.hdfs.hadoopconf", "path to core-site.xml";
    ParameterTool pt = ParameterTool.fromMap(par);
    env.getConfig().setGlobalJobParameters(pt);
g0czyy6m

g0czyy6m2#

打个电话就行了

FileSystem.initialize(GlobalConfiguration.loadConfiguration(System.getenv("FLINK_CONF_DIR")));

之前

env.execute()

会解决问题的。
请记住,您仍然必须将密钥和访问密钥放在flink-conf.yaml中。

相关问题