我能够成功地运行我的flink作业,这节省了使用s3的时间 ./bin/flink run ...
.
为此,我必须将flink-s3-fs-presto jar复制到我的 $FLINK_HOME/lib
文件夹中,我还必须配置我的s3连接详细信息 flink-conf.yaml
:
您需要在flink的flink-conf.yaml中配置s3.access-key和s3.secret-key:
s3.access-key: your-access-key
s3.secret-key: your-secret-key
来源:flink aws docs
我还必须设置一个属性 s3.endpoint
因为我使用的是ibmcloud的s3。
当我使用 ./bin/flink run
.
但是,当我尝试从ide(intellij)运行作业时,出现以下错误:
org.apache.flink.runtime.client.jobexecutionexception:无法初始化任务“datasink(textoutputformat(s3://x/folder)-utf-8)”:无法从服务端点加载凭据
我在ide运行作业中设置了一个环境变量, FLINK_CONF_DIR
指向我的flink-conf.yaml,我可以看到我的配置属性被选中了:
11:04:39,487 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: s3.access-key,****
11:04:39,487 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: s3.secret-key,****
11:04:39,487 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: s3.endpoint, s3-api.us-geo.objectstorage.softlayer.net
但是,当我从ide运行时,有一个错误提示这些属性没有传递到presto库:
Caused by: org.apache.flink.fs.s3presto.shaded.com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
另外,为了验证这个理论,如果我在从ide运行时单步执行代码,我可以看到我的endpoint属性没有被应用:
... 深入到hadoop配置中,我可以看到flink配置是一个空Map:
再深入一点,我可以看到 org.apache.flink.core.fs.FileSystem#getUnguardedFileSystem()
正在创建新的空配置:
// this "default" initialization makes sure that the FileSystem class works
// even when not configured with an explicit Flink configuration, like on
// JobManager or TaskManager setup
if (FS_FACTORIES.isEmpty()) {
initialize(new Configuration());
}
如何配置 s3.access-key
, s3.secret-key
以及 s3.endpoint
从ide运行时的属性?
2条答案
按热度按时间xriantvc1#
创建core-site.xml
并添加
g0czyy6m2#
打个电话就行了
之前
会解决问题的。
请记住,您仍然必须将密钥和访问密钥放在flink-conf.yaml中。