我正在试验新的Flink Kubernetes操作符,我已经能够做几乎所有我需要的事情,除了一件事:从S3文件系统中获取JAR文件。
背景
我有一个Flink应用程序运行在AWS的EKS集群中,所有的信息都保存在S3存储桶中,比如保存点、检查点、高可用性和JAR文件都存储在那里。
我已经能够将保存点、检查点和高可用性信息保存在存储桶中,但是当尝试从同一存储桶中获取JAR文件时,我得到了错误:Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugins: flink-s3-fs-hadoop, flink-s3-fs-presto.
我可以进入this thread,但是我不能让资源获取器正常工作,而且解决方案也不理想,我正在寻找一个更直接的方法。
部署文件
下面是我在群集中部署的文件:
deployment.yml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-deployment
spec:
podTemplate:
apiVersion: v1
kind: Pod
metadata:
name: pod-template
spec:
containers:
- name: flink-main-container
env:
- name: ENABLE_BUILT_IN_PLUGINS
value: flink-s3-fs-presto-1.15.3.jar;flink-s3-fs-hadoop-1.15.3.jar
volumeMounts:
- mountPath: /flink-data
name: flink-volume
volumes:
- name: flink-volume
hostPath:
path: /tmp
type: Directory
image: flink:1.15
flinkVersion: v1_15
flinkConfiguration:
state.checkpoints.dir: s3://kubernetes-operator/checkpoints
state.savepoints.dir: s3://kubernetes-operator/savepoints
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3://kubernetes-operator/ha
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
serviceAccount: flink
session-job.yml
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: flink-session-job
spec:
deploymentName: flink-deployment
job:
jarURI: s3://kubernetes-operator/savepoints/flink.jar
parallelism: 3
upgradeMode: savepoint
savepointTriggerNonce: 0
我使用的Flink Kubernetes运算符版本是1.3.1
有什么我错过了或者做错了的吗?
1条答案
按热度按时间0lvr5msh1#
jar的下载发生在flink-kubernetes-operator pod中。因此,当你应用FlinkSessionJob时,fink操作员会识别Crd,并尝试从jarUri位置下载jar,构建一个JobGraph,并将sessionJob提交给JobDeployment。Flink Kubernetes Operator也会在其中运行flink来构建一个JobGraph。因此,您必须将flink-s3-fs-hadoop-1.15.3.jar添加到flink-kubernetes-operator内的位置**/opt/flink/plugins/s3-fs-hadoop/**中
您可以通过扩展www.example.com映像添加jarghcr.io/apache/flink-kubernetes-operator, curl jar并将其复制到插件位置
或
您可以编写一个initContainer,它将把jar下载到一个卷并挂载该卷
此外,如果您使用serviceAccount进行S3身份验证,请在flinkConfig中给予以下配置