我想通过java应用程序将配置好的flink作业部署到独立集群。我想用restclusterclient。
据我所知,我可以用以下方法:
用我的作业创建一个jar(例如用数据转换从一个kafka主题到另一个主题的简单流);
使用packagedprograutils从jar文件创建jobgraph;
初始化restclusterclient并将jobgraph提交到集群。
PackagedProgram packagedProgram = PackagedProgram.newBuilder()
.setJarFile(new File("path/to/my/jar/file"))
.setArguments(arguments)
.build();
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, flinkConfiguration, 1, false);
try (RestClusterClient<StandaloneClusterId> client = new RestClusterClient<>(flinkConfiguration, StandaloneClusterId.getInstance())) {
client.submitJob(jobGraph).get();
}
但是,我想配置我的作业并将其部署到一个java应用程序中的集群。我找到了以下方法:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// add source, any data processing and sink
StreamGraph streamGraph = graphFlow.getConfiguredEnvironment().getStreamGraph("myGraphName", false);
JobGraph graph = streamGraph.getJobGraph();
try (RestClusterClient<StandaloneClusterId> client = new RestClusterClient<>(flinkConfiguration, StandaloneClusterId.getInstance())) {
client.submitJob(jobGraph).get();
}
此解决方案似乎不可接受,因为streamexecutionenvironment方法被注解为@internal。还有哪些方法可以用来构建可以用restclusterclient上传的jobgraph对象?
由于需要将作业上载到flink cluster,因此无法使用remotestreamenvironment。
flink版本是1.11.2
暂无答案!
目前还没有任何答案,快来回答吧!