我在docker compose中运行Flink,其中一些数据文件装载到docker卷中,Flink作业使用File连接器读取这些数据文件。我在IntelliJ本地测试了文件阅读逻辑,它工作正常。然而,当我在docker环境中运行这些作业时,它们会失败,但有以下例外:
Caused by: java.io.FileNotFoundException: File file:/quark/data/yelp/user.json.gz does not exist or the user running Flink ('flink') has insufficient permissions to access it.
at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:106)
at org.apache.flink.connector.file.src.impl.StreamFormatAdapter.openStream(StreamFormatAdapter.java:157)
at org.apache.flink.connector.file.src.impl.StreamFormatAdapter.createReader(StreamFormatAdapter.java:70)
at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
为了验证文件是否存在,我在提交Flink作业之前在main
函数中添加了以下先决条件:
Preconditions.checkState(new LocalFileSystem().getFileStatus(inputFilePath) != null,
"file status must be valid");
final FileSource<String> source =
FileSource.forRecordStreamFormat(new TextLineInputFormat(), inputFilePath).build();
try (StreamExecutionEnvironment env = createExecutionEnvironment()) {
LOG.info("Running Flink environment with config:" + env.getConfig());
final DataStream<String> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), targetTopic);
String pulsarTopic = tenet + "/" + namespace + "/" + targetTopic;
PulsarSink<String> sink = getPulsar().createSink(pulsarTopic);
LOG.info("Start writing to sink: " + pulsarTopic);
stream.sinkTo(sink);
env.execute();
}
先决条件检查通过,因为文件存在且可读。但是,提交的作业仍然会失败。
我的假设是,我使用root
用户在docker容器中提交了作业,而flink作业在flink
下运行。
但是,我使用su flink
假设flink
用户来测试挂载的数据文件的可读性。我还尝试更改数据挂载文件夹的权限,以允许每个人都执行rw
操作。现在我不确定我错过了什么。
顺便说一句,我使用官方的Flink 1.16.1 Docker镜像作为我的基础镜像:
ARG FLINK_VERSION=1.16.1
FROM flink:${FLINK_VERSION}
# add vim so it's easier to edit files during dev time
RUN apt-get update && apt-get install vim -y
这是我的docker compose文件
version: "3.9"
services:
pulsar:
image: "apachepulsar/pulsar:3.0.0"
container_name: pulsar
user: root
command: bin/pulsar standalone
ports:
- 6650:6650
- 8080:8080
volumes:
- ./tmp/pulsardata:/pulsar/data
- ../../bin:/quark/bin
environment:
- PULSAR_BIN=/pulsar/bin
jobmanager:
image: ${FLINK_IMAGE}
container_name: flink_master
ports:
- "18081:8081"
command: jobmanager
volumes:
- ../../java/quark-flink-jobs/build/libs:/quark/jobs
- ../../java/quark-dev-tools/build/libs:/quark/dev-tools
- ../../bin:/quark/bin
- ../../data:/quark/data
- ./flink_job_config:/quark/config
- ./log:/opt/flink/log
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
- FLINK_BIN=/opt/flink/bin
- FLINK_JOBS_FOLDER=/quark/jobs
- YELP_DATA_FOLDER=/quark/data/yelp
taskmanager:
image: ${FLINK_IMAGE}
depends_on:
- jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
localstack:
image: localstack/localstack
container_name: localstack
ports:
- "127.0.0.1:4566:4566" # LocalStack Gateway
- "127.0.0.1:4510-4559:4510-4559" # external services port range
environment:
- DEBUG=${DEBUG-}
- DOCKER_HOST=unix:///var/run/docker.sock
volumes:
- "${LOCALSTACK_VOLUME_DIR:-./tmp/localstack/volume}:/var/lib/localstack"
- "/var/run/docker.sock:/var/run/docker.sock"
我期望作业在从IntelliJ运行时运行
1条答案
按热度按时间7nbnzgx91#
Nvm.我发现了问题:我忘了我需要在Flink作业管理器和任务管理器中挂载数据文件。
它和这个问题完全一样:Job/jar in Apache Flink doesn't have permisson to file in Docker