使用Flink Kubernetes操作符从S3获取JAR文件

esyap4oy  于 2023-02-17  发布在  Apache
关注(0)|答案(1)|浏览(455)

我正在试验新的Flink Kubernetes操作符,我已经能够做几乎所有我需要的事情,除了一件事:从S3文件系统中获取JAR文件。

背景

我有一个Flink应用程序运行在AWS的EKS集群中,所有的信息都保存在S3存储桶中,比如保存点、检查点、高可用性和JAR文件都存储在那里。
我已经能够将保存点、检查点和高可用性信息保存在存储桶中,但是当尝试从同一存储桶中获取JAR文件时,我得到了错误:Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugins: flink-s3-fs-hadoop, flink-s3-fs-presto.
我可以进入this thread,但是我不能让资源获取器正常工作,而且解决方案也不理想,我正在寻找一个更直接的方法。

部署文件

下面是我在群集中部署的文件:
deployment.yml

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: flink-deployment
spec:
  podTemplate:
    apiVersion: v1
    kind: Pod
    metadata:
      name: pod-template
    spec:
      containers:
        - name: flink-main-container
          env:
            - name: ENABLE_BUILT_IN_PLUGINS
              value: flink-s3-fs-presto-1.15.3.jar;flink-s3-fs-hadoop-1.15.3.jar
          volumeMounts:
            - mountPath: /flink-data
              name: flink-volume
      volumes:
        - name: flink-volume
          hostPath:
            path: /tmp
            type: Directory
  image: flink:1.15
  flinkVersion: v1_15
  flinkConfiguration:
    state.checkpoints.dir: s3://kubernetes-operator/checkpoints
    state.savepoints.dir: s3://kubernetes-operator/savepoints
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: s3://kubernetes-operator/ha
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  serviceAccount: flink

session-job.yml

apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
  name: flink-session-job
spec:
  deploymentName: flink-deployment
  job:
    jarURI: s3://kubernetes-operator/savepoints/flink.jar
    parallelism: 3
    upgradeMode: savepoint
    savepointTriggerNonce: 0

我使用的Flink Kubernetes运算符版本是1.3.1
有什么我错过了或者做错了的吗?

0lvr5msh

0lvr5msh1#

jar的下载发生在flink-kubernetes-operator pod中。因此,当你应用FlinkSessionJob时,fink操作员会识别Crd,并尝试从jarUri位置下载jar,构建一个JobGraph,并将sessionJob提交给JobDeployment。Flink Kubernetes Operator也会在其中运行flink来构建一个JobGraph。因此,您必须将flink-s3-fs-hadoop-1.15.3.jar添加到flink-kubernetes-operator内的位置**/opt/flink/plugins/s3-fs-hadoop/**中
您可以通过扩展www.example.com映像添加jarghcr.io/apache/flink-kubernetes-operator, curl jar并将其复制到插件位置

您可以编写一个initContainer,它将把jar下载到一个卷并挂载该卷

volumes:
    - name: s3-plugin
      emptyDir: { }
    initContainers:
    - name: busybox
      image: busybox:latest
      volumeMounts:
      - mountPath: /opt/flink/plugins/s3-fs-hadoop
        name: s3-plugin
    containers:
    - image: 'ghcr.io/apache/flink-kubernetes-operator:95128bf'
      name: flink-kubernetes-operator
      volumeMounts:
      - mountPath: /opt/flink/plugins/s3-fs-hadoop
        name: s3-plugin

此外,如果您使用serviceAccount进行S3身份验证,请在flinkConfig中给予以下配置

fs.s3a.aws.credentials.provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider

相关问题