使用自定义配置通过Flink流式传输执行环境时未接收到输出

wbgh16ku  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(140)

我正在运行Apache Flink版本1.12.7,并将流执行环境配置为任务管理器的任务插槽数= 3(只是试验),但无法看到环境读取的文件的输出。相反,如日志中所示,执行图被卡在SCHEDULED状态,无法进入RUNNING状态。
请注意,如果属性文件中未传递任何配置,则一切正常,并且由于执行图切换到RUNNING状态,环境能够读取文件,因此可以看到输出。
代码如下:

ParameterTool parameters = ParameterTool.fromPropertiesFile("src/main/resources/application.properties");
    Configuration config = Configuration.fromMap(parameters.toMap());
    TaskExecutorResourceUtils.adjustForLocalExecution(config);

    StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment(config);

    System.out.println("Config Params : " + config.toMap());

    DataStream<String> inputStream =
            env.readTextFile(FILEPATH);

    DataStream<String> filteredData = inputStream.filter((String value) -> {
        String[] tokens = value.split(",");
        return Double.parseDouble(tokens[3]) >= 75.0;
    });

    filteredData.print(); // no o/p seen if configuration object is set otherwise everything works as expected

    env.execute("Filter Country Details");

需要帮助来理解这种行为,以及为了打印输出沿着进行一些自定义配置,应该做哪些更改。谢谢。

jjjwad0x

jjjwad0x1#

Okay..So found the answer to the above puzzle by referring to some links mentioned below.

Solution : So I set the parallelism (env.setParallelism) in the above code just after configuring the streaming execution environment and the file was read with output generated as expected.

Post that, experimented with a few things :

  • set parallelism equal to number of task slots = everything worked
  • set parallelism greater than number of task slots = intermittent results
  • set parallelism less than number of task slots = intermittent results.

As per this link corresponding to Flink Architecture,
A Flink cluster needs exactly as many task slots as the highest parallelism used in the job
So its best to go with no. of task slots for a task manager equal to the parallelism configured.

相关问题