文件不存在或运行Flink('flink ')的用户没有足够的权限访问它

yqlxgs2m  于 2023-09-28  发布在  Apache
关注(0)|答案(1)|浏览(495)

我在docker compose中运行Flink,其中一些数据文件装载到docker卷中,Flink作业使用File连接器读取这些数据文件。我在IntelliJ本地测试了文件阅读逻辑,它工作正常。然而,当我在docker环境中运行这些作业时,它们会失败,但有以下例外:

Caused by: java.io.FileNotFoundException: File file:/quark/data/yelp/user.json.gz does not exist or the user running Flink ('flink') has insufficient permissions to access it.
    at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:106)
    at org.apache.flink.connector.file.src.impl.StreamFormatAdapter.openStream(StreamFormatAdapter.java:157)
    at org.apache.flink.connector.file.src.impl.StreamFormatAdapter.createReader(StreamFormatAdapter.java:70)
    at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)

为了验证文件是否存在,我在提交Flink作业之前在main函数中添加了以下先决条件:

Preconditions.checkState(new LocalFileSystem().getFileStatus(inputFilePath) != null,
            "file status must be valid");

        final FileSource<String> source =
            FileSource.forRecordStreamFormat(new TextLineInputFormat(), inputFilePath).build();
        try (StreamExecutionEnvironment env = createExecutionEnvironment()) {

            LOG.info("Running Flink environment with config:" + env.getConfig());

            final DataStream<String> stream =
                env.fromSource(source, WatermarkStrategy.noWatermarks(), targetTopic);

            String pulsarTopic = tenet + "/" + namespace + "/" + targetTopic;
            PulsarSink<String> sink = getPulsar().createSink(pulsarTopic);

            LOG.info("Start writing to sink: " + pulsarTopic);
            stream.sinkTo(sink);
            env.execute();
        }

先决条件检查通过,因为文件存在且可读。但是,提交的作业仍然会失败。
我的假设是,我使用root用户在docker容器中提交了作业,而flink作业在flink下运行。
但是,我使用su flink假设flink用户来测试挂载的数据文件的可读性。我还尝试更改数据挂载文件夹的权限,以允许每个人都执行rw操作。现在我不确定我错过了什么。
顺便说一句,我使用官方的Flink 1.16.1 Docker镜像作为我的基础镜像:

ARG FLINK_VERSION=1.16.1

FROM flink:${FLINK_VERSION}

# add vim so it's easier to edit files during dev time
RUN apt-get update && apt-get install vim -y

这是我的docker compose文件

version: "3.9"
services:
  pulsar:
    image: "apachepulsar/pulsar:3.0.0"
    container_name: pulsar  
    user: root
    command: bin/pulsar standalone
    ports:
      - 6650:6650
      - 8080:8080
    volumes:
      - ./tmp/pulsardata:/pulsar/data
      - ../../bin:/quark/bin
    environment:
      - PULSAR_BIN=/pulsar/bin
     
  jobmanager:
    image: ${FLINK_IMAGE} 
    container_name: flink_master
    ports:
      - "18081:8081"
    command: jobmanager
    volumes:
      - ../../java/quark-flink-jobs/build/libs:/quark/jobs
      - ../../java/quark-dev-tools/build/libs:/quark/dev-tools
      - ../../bin:/quark/bin
      - ../../data:/quark/data
      - ./flink_job_config:/quark/config
      - ./log:/opt/flink/log
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager   
      - FLINK_BIN=/opt/flink/bin
      - FLINK_JOBS_FOLDER=/quark/jobs
      - YELP_DATA_FOLDER=/quark/data/yelp
      

  taskmanager:
    image: ${FLINK_IMAGE} 
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2  

  localstack:
    image: localstack/localstack
    container_name: localstack
    ports:
      - "127.0.0.1:4566:4566"            # LocalStack Gateway
      - "127.0.0.1:4510-4559:4510-4559"  # external services port range
    environment:
      - DEBUG=${DEBUG-}
      - DOCKER_HOST=unix:///var/run/docker.sock
    volumes:
      - "${LOCALSTACK_VOLUME_DIR:-./tmp/localstack/volume}:/var/lib/localstack"
      - "/var/run/docker.sock:/var/run/docker.sock"

我期望作业在从IntelliJ运行时运行

7nbnzgx9

7nbnzgx91#

Nvm.我发现了问题:我忘了我需要在Flink作业管理器和任务管理器中挂载数据文件。
它和这个问题完全一样:Job/jar in Apache Flink doesn't have permisson to file in Docker

相关问题