如何在本地测试中将AWS Configs传递给Flink的flink-s3-fs-hadoop插件?

lsmepo6l  于 2024-01-04  发布在  Apache
关注(0)|答案(1)|浏览(179)

问题摘要

  • 我正在尝试为我的flink应用程序运行本地集成测试,它从AWS s3读取parquet文件,进行一些转换,并将输出写回s3;我使用以下代码:
  • Flink的Parquet格式源码:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/formats/parquet/
  • localstack/testcontainers在localhost上启动本地s3
  • junit 5运行我的测试
  • Flink minicluster在测试期间运行我的应用程序
  • IntelliJ IDEA
  • Flink版本1.17.0、maven和java 11
  • 因为flink应用程序与s3交互(我相信上面的parquet格式需要它),所以我在项目中使用flink-s3-fs-hadoop作为测试依赖项
    *问题
  • 似乎没有办法将所有必需的AWS服务传递给flink-s3-fs-hadoop插件使用的S3客户端
  • 由于上述原因,使用Flink和AWS s3进行本地测试目前是不可能的;当我的源代码试图读取我放在localstack s3中的parquet文件时,我遇到了AmazonS3Exception 403 Forbidden
  • 我需要通过的AWS认证如下:
  • s3.access-key
  • s3.secret-key
  • s3.endpoint
  • s3.endpoint.region
  • s3.path.style.access
  • 注意:应用程序在“prod”中工作,因为它在EMR上运行;只是在本地发生此问题
    我所尝试的
  • 添加AWS_ACCESS_KEY_ID、AWS_SECRET_ACCESS_KEY、AWS_ENDPOINT_URL和AWS_REGION作为环境变量(IntelliJ运行配置)
  • 在我设置这些之前,我得到了一个错误,即无法找到访问密钥和密钥,所以我相信这些能够被拾取(在给定https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Authenticating_via_the_AWS_Environment_Variables的情况下是有意义的)
  • 其他环境变量没有被选中;我通过在调试模式下运行我的应用程序并查看AmazonS 3客户端参数来确认
  • 在我的测试资源文件夹下添加一个conf/flink-conf.yaml,并设置以下环境变量FLINK_CONF_DIR=src/test/resources/conf
  • 看起来Flink可以找到这个文件,但它没有将任何值传递给插件
  • 使用指定的AWS脚本创建本地执行环境(请参见下面的代码)
Properties props = new Properties();
        props.put("s3.access-key", "test");
        props.put("s3.secret-key",  "test");
        props.put("s3.endpoint", "http://localstack:4566");
        props.put("s3.endpoint.region", "us-east-1");
        props.put("s3.path.style.access", "true");
        Configuration configuration = ConfigurationUtils.createConfiguration(props);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(16, configuration);

字符串

  • 我使用docker-compose在本地启动了一个Flink集群(jobmanager + taskmanager),在FLINK_PROPERTIES环境变量中添加了作为k,v对的AWS对象(然后添加到flink-conf),启用了flink-s3-fs-hadoop插件,并确认flink应用程序在提交到这个集群时可以正常工作(即,这确认了当添加对象并可用于插件时应用程序可以正常工作)。
  • 注意:即使这种方式有效,但对于我的用例来说还不够好;我需要能够在像Junit这样的自动化测试套件中运行测试
services:
  localstack:
    container_name: "awslocal"
    image: localstack/localstack:1.4.0
    ports:
      - "127.0.0.1:4510-4559:4510-4559"
      - "127.0.0.1:4566:4566"
      - "127.0.0.1:4571:4571"
    environment:
      - SERVICES=s3
      - DEBUG=1
      - LS_LOG=trace
      - AWS_ACCESS_KEY_ID=test
      - AWS_SECRET_ACCESS_KEY=test
      - AWS_DEFAULT_REGION=us-east-1
      - HOST_TMP_FOLDER=${TMPDIR:-/tmp/}localstack
      - DOCKER_HOST=unix:///var/run/docker.sock
    volumes:
      - "${TMPDIR:-/tmp}/localstack:/var/lib/localstack"
      - "/var/run/docker.sock:/var/run/docker.sock"
  jobmanager:
    container_name: "jobmanager"
    image: flink:1.17.0-java11
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        s3.access-key: test
        s3.secret-key: test
        s3.endpoint: http://awslocal:4566
        s3.endpoint.region: us-east-1
        s3.path.style.access: true
      - ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17.0.jar
    volumes:
      - /Users/myname/IdeaProjects/my-flink-app/target:/opt/flink/usrlib
  taskmanager:
    container_name: "taskmanager"
    image: flink:1.17.0-java11
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2
        s3.access-key: test
        s3.secret-key: test
        s3.endpoint: http://awslocal:4566
        s3.endpoint.region: us-east-1
        s3.path.style.access: true
      - ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17.0.jar

ztyzrc3y

ztyzrc3y1#

你可以试试这个:
1-使用以下命令创建测试配置文件(flink-config.yaml)

## S3 config
s3.access-key: test
s3.secret-key: test
s3.endpoint: http://awslocal:4566
s3.endpoint.region: us-east-1
s3.path.style.access: true

字符串
2-加载此配置,

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

Configuration loadedConfig = GlobalConfiguration.loadConfiguration("directory/where/is/flinkconfig");
FileSystem.initialize(config, null);
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(loadedConfig);


关键字:FileSystem.initialize(config, null);

相关问题