是否可以将riak cs与apache flink一起使用?

dwthyt8l  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(300)

我想配置 filesystem 状态后端和 zookeeper 恢复模式:

state.backend: filesystem
state.backend.fs.checkpointdir: ???

recovery.mode: zookeeper
recovery.zookeeper.storageDir: ???

如你所见,我应该具体说明 checkpointdir 以及 storageDir 参数,但我没有任何apache flink支持的文件系统(如hdfs或amazons3)。但我已经安装了RIAKCS集群(似乎它与s3兼容)。
那么,我可以将riak cs与apache flink一起使用吗?如果可能的话:如何配置apacheflink与riakcs一起工作?

6tqwzwtp

6tqwzwtp1#

答:如何加入apache flink和riak cs?
riak cs具有s3(版本2)兼容接口。因此,可以使用hadoop中的s3文件系统适配器来处理riakcs。
我不知道为什么,但是apache flink在fat jar中只有部分hadoop文件系统适配器( lib/flink-dist_2.11-1.0.1.jar )即有ftp文件系统( org.apache.hadoop.fs.ftp.FTPFileSystem )但没有s3文件系统(即。 org.apache.hadoop.fs.s3a.S3AFileSystem ). 所以,有两种方法可以解决这个问题:
使用hadoop安装中的这些适配器。我没有试过这个,但似乎您应该只配置hadoop\u classpath或hadoop\u home evn变量。
monky补丁apache flink并下载所需的jar到 <flink home>/lib 目录
所以,我选择第二种方式是因为我不想在我的环境中配置hadoop。您可以从hadoop dist或internet复制JAR:

curl http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.2/hadoop-aws-2.7.2.jar -o /flink/lib/hadoop-aws-2.7.2.jar
curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -o /flink/lib/aws-java-sdk-1.7.4.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.2.5/httpcore-4.2.5.jar -o /flink/lib/httpcore-4.2.5.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar -o /flink/lib/httpclient-4.2.5.jar

如您所见,我使用的是旧版本,因为这样的版本在hadoop2.7.2中使用,我使用的flink与这个版本的hadoop兼容。
仅供参考:如果您在自己的流中使用这些jar的最新版本,这样的黑客可能会导致问题。为了避免与不同版本相关的问题,当您使用flow构建fat jar时,您可以重新定位包,例如(我使用的是gradle):

// Relocate org.apache.http packages because Apache Flink include old version of this library (we place them for using S3 compatible FS)
shadowJar {
    dependencies {
        include(dependency('.*:.*:.*'))
    }

    relocate 'org.apache.http', 'relocated.org.apache.http'
    relocate 'org.apache.commons', 'relocated.org.apache.commons'
}

然后您应该指定 core-site.xmlflink-conf.yaml 因为hadoop兼容的文件系统使用此配置加载设置:

...
fs.hdfs.hadoopconf: /flink/conf
...

如你所见,我只是把它放在 <fink home>/conf 目录。它具有以下设置:

<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
    <property>
        <name>fs.s3a.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> // because S3A better then other: https://wiki.apache.org/hadoop/AmazonS3
    </property>
    <property>
        <name>fs.s3a.endpoint</name>
        <value>my-riak-cs.stage.local</value>  // this is my Riak CS host
    </property>
    <property>
        <name>fs.s3a.connection.ssl.enabled</name> // my Riak CS in staging doesn't support SSL
        <value>false</value>
    </property>
    <property>
        <name>fs.s3a.access.key</name>
        <value>????</value> // this is my access key for Riak CS
    </property>
    <property>
        <name>fs.s3a.secret.key</name>
        <value>????</value> // this is my secret key for Riak CS
    </property>
</configuration>

然后您应该在中配置riak cs bucket flink-conf.yaml 作为推荐人:

...
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints
...
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery
...

并在riak cs中创建桶。我正在使用 s3cmd (安装在 brew 在我的os x开发环境中):

s3cmd mb s3://example-staging-flink

仅供参考:使用前 s3cmd 你应该配置它使用 s3cmd --configure 然后在中修复一些设置 ~/.s3cmd 文件:

signature_v2 = True // because Riak CS using S3 V2 interface
use_https = False // if your don't use SSL
access_key = ???
secret_key = ???
host_base = my-riak-cs.stage.local // your Riak CS host
host_bucket = %(bucket).my-riak-cs.stage.local // format of bucket used by Riak CS

所以,这就是您应该在riak cs中为独立ha apache flink集群的save/restore状态配置的全部内容。

相关问题