Flink 1.15:为DataStream API设置BATCH执行模式时出错

v64noz0r  于 2024-01-04  发布在  Apache
关注(0)|答案(1)|浏览(171)

我正在使用Flink 1.15 DataStream API做ETL作业,我想将我的作业设置为BATCH执行模式,所以我使用了官方网站提供的代码stie. env.setRuntimeMode(RuntimeExecutionMode.BATCH);但是,我遇到了以下错误:
java.lang.UnsupportedOperationException at org.apache.flink.runtime.io.network.partition.ResultPartition.getAllDataProcessedFuture(ResultPartition.java:233)
我的整个代码逻辑

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path/to/file");

`env.setRuntimeMode(RuntimeExecutionMode.BATCH);`

DataStream<OutputType> result = text
.map(/* map logic here */ )
.keyBy(/* keyby logic here */)
.reduce(/* reduce logic here */)

result.writeAsText("filePath")

字符串
任何人都可以提供一些见解,为什么我得到这个错误,以及如何解决它?谢谢!
我的项目背景(如果你想知道更多为什么我想使用批处理模式):
我目前正在处理一个从S3读取数据的作业,用键对数据执行一些转换和归约。在这个过程中,我遇到了一个问题,我的应用程序似乎存储了每个中间归约结果,而不仅仅是每个键的最终归约值。我明白这可能是由于流式执行的性质,我的情况与这篇文章很相似:https://stackoverflow.com/questions/58828218/how-to-avoid-duplicate-key-tuples-in-word-count-w-apache-flink
所以我想改为批处理模式,看看它是否工作。
我尝试:
1.我删除了转换逻辑,但仍然有如上所述的错误:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path/to/file");

`env.setRuntimeMode(RuntimeExecutionMode.BATCH);`

text.writeAsText("filePath")

mrzz3bfm

mrzz3bfm1#

我可以通过禁用检查点来修复这个问题。这是特定于在AWS托管Flink中运行它的。在Terraform中,我添加了这个

flink_application_configuration {
      checkpoint_configuration {
        configuration_type    = "CUSTOM"
        checkpointing_enabled = false // disable checkpoints since it is a batch job
      }

字符串
aws_kinesisanalyticsv2_application资源内部。

相关问题