我正在使用Flink 1.15 DataStream API做ETL作业,我想将我的作业设置为BATCH执行模式,所以我使用了官方网站提供的代码stie. env.setRuntimeMode(RuntimeExecutionMode.BATCH);
但是,我遇到了以下错误:java.lang.UnsupportedOperationException at org.apache.flink.runtime.io.network.partition.ResultPartition.getAllDataProcessedFuture(ResultPartition.java:233)
个
我的整个代码逻辑
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/to/file");
`env.setRuntimeMode(RuntimeExecutionMode.BATCH);`
DataStream<OutputType> result = text
.map(/* map logic here */ )
.keyBy(/* keyby logic here */)
.reduce(/* reduce logic here */)
result.writeAsText("filePath")
字符串
任何人都可以提供一些见解,为什么我得到这个错误,以及如何解决它?谢谢!
我的项目背景(如果你想知道更多为什么我想使用批处理模式):
我目前正在处理一个从S3读取数据的作业,用键对数据执行一些转换和归约。在这个过程中,我遇到了一个问题,我的应用程序似乎存储了每个中间归约结果,而不仅仅是每个键的最终归约值。我明白这可能是由于流式执行的性质,我的情况与这篇文章很相似:https://stackoverflow.com/questions/58828218/how-to-avoid-duplicate-key-tuples-in-word-count-w-apache-flink
所以我想改为批处理模式,看看它是否工作。
我尝试:
1.我删除了转换逻辑,但仍然有如上所述的错误:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/to/file");
`env.setRuntimeMode(RuntimeExecutionMode.BATCH);`
text.writeAsText("filePath")
型
1条答案
按热度按时间mrzz3bfm1#
我可以通过禁用检查点来修复这个问题。这是特定于在AWS托管Flink中运行它的。在Terraform中,我添加了这个
字符串
aws_kinesisanalyticsv2_application资源内部。