在一个flink作业中使用collect()和env.execute()

q5lcpyga  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(512)

我想用flink写一个需要两个阶段的计算。
在第一阶段,我创建一个图并获取其顶点ID:

  1. List<String> ids = graph.getVertexIds().collect();

在第二阶段,我想使用这些ID为每个顶点运行singlesourceshortestpath。

  1. for (String id: ids){
  2. System.out.println("Source Id: "+id);
  3. graph.run( new SingleSourceShortestPaths<String, String>(id, 10)).print();
  4. }

它在本地工作(在intellijide和命令行中使用 ./bin/flink run ... ),但当我使用flink的webui提交作业时,程序只执行到 collect() 方法,而不运行程序的其余部分(对于语句和 print() ).
有什么问题?
这是我的密码:

  1. import org.apache.flink.api.common.functions.MapFunction;
  2. import org.apache.flink.api.java.ExecutionEnvironment;
  3. import org.apache.flink.graph.Edge;
  4. import org.apache.flink.graph.Graph;
  5. import org.apache.flink.graph.library.SingleSourceShortestPaths;
  6. import java.util.ArrayList;
  7. import java.util.List;
  8. public class Main {
  9. public static void main(String[] args) throws Exception {
  10. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  11. Edge<String, Double> e1 = new Edge<String, Double>("1", "2", 0.5);
  12. Edge<String, Double> e2 = new Edge<String, Double>("2", "3", 0.5);
  13. Edge<String, Double> e3 = new Edge<String, Double>("4", "5", 0.5);
  14. Edge<String, Double> e4 = new Edge<String, Double>("5", "6", 0.5);
  15. Edge<String, Double> e5 = new Edge<String, Double>("7", "8", 0.5);
  16. List<Edge<String, Double>> edgeList = new ArrayList<Edge<String, Double>>();
  17. edgeList.add(e1);
  18. edgeList.add(e2);
  19. edgeList.add(e3);
  20. edgeList.add(e4);
  21. edgeList.add(e5);
  22. Graph<String, String, Double> graph = Graph.fromCollection(edgeList,
  23. new MapFunction<String, String>() {
  24. public String map(String value) {
  25. return value;
  26. }
  27. }, env);
  28. List<String> ids = graph.getVertexIds().collect();
  29. for (String id: ids){
  30. System.out.println("Source Id: "+id);
  31. graph.run( new SingleSourceShortestPaths<String, String>(id, 10)).print();
  32. }
  33. }
  34. }
tez616oj

tez616oj1#

基于此链接,flink转换是延迟的,这意味着在调用sink操作之前不会执行它们。
flink中的sink操作触发流的执行以产生所需的程序结果,例如将结果保存到文件系统或将其打印到标准输出
方法如 Dataset.collect() , Dataset.Count() 以及 Dataset.print() 是触发实际数据转换的接收器操作。

相关问题