如何在flink中打印批处理的执行计划?

raogr8fs  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(531)

我用flink创建了一个java程序,可以将2个矩阵相乘。我正在使用批处理(dataset)环境来处理它,并且我想显示这个的执行计划。当我做一些流(datastream)的例子时,我刚刚调用了 StreamExecutionEnvironment.getExecutionEnvironment().getExecutionPlan() . flink有同样的方法可用于批处理,但当我调用它时,我得到错误: java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'. . 我想我在做这里描述的事情:https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_plans.html 但由于某种原因,我得到了例外。

  1. import org.apache.flink.api.common.functions.MapFunction;
  2. import org.apache.flink.api.common.functions.MapPartitionFunction;
  3. import org.apache.flink.api.common.functions.ReduceFunction;
  4. import org.apache.flink.api.common.typeinfo.Types;
  5. import org.apache.flink.api.java.DataSet;
  6. import org.apache.flink.api.java.ExecutionEnvironment;
  7. import org.apache.flink.api.java.tuple.Tuple2;
  8. import org.apache.flink.api.java.tuple.Tuple3;
  9. import org.apache.flink.api.java.tuple.Tuple4;
  10. import org.apache.flink.util.Collector;
  11. public class MatrixMultiplication {
  12. private static final String MATRIX_A = "A";
  13. private static final String MATRIX_B = "B";
  14. public MatrixMultiplication() throws Exception {
  15. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  16. DataSet<Tuple4<String, Integer, Integer, Integer>> matrixA = env.readCsvFile("resources/matrixA.csv")
  17. .fieldDelimiter(",").types(Integer.class, Integer.class, Integer.class)
  18. .map(t -> new Tuple4<String, Integer, Integer, Integer>("A", t.f0, t.f1, t.f2))
  19. .returns(Types.TUPLE(Types.STRING, Types.INT, Types.INT, Types.INT));
  20. System.out.println("Matrix A");
  21. matrixA.print();
  22. DataSet<Tuple4<String, Integer, Integer, Integer>> matrixB = env.readCsvFile("resources/matrixB.csv")
  23. .fieldDelimiter(",").types(Integer.class, Integer.class, Integer.class)
  24. .map(t -> new Tuple4<String, Integer, Integer, Integer>("B", t.f0, t.f1, t.f2))
  25. .returns(Types.TUPLE(Types.STRING, Types.INT, Types.INT, Types.INT));
  26. System.out.println("Matrix B");
  27. matrixB.print();
  28. int columnsMatrixB = 2;
  29. int linesMatrixA = 2;
  30. DataSet<Tuple2<Tuple3<Integer, Integer, Integer>, Integer>> keyValueMatrixA = matrixA
  31. .mapPartition(new MapMatrixToKeysAndValues(columnsMatrixB));
  32. DataSet<Tuple2<Tuple3<Integer, Integer, Integer>, Integer>> keyValueMatrixB = matrixB
  33. .mapPartition(new MapMatrixToKeysAndValues(linesMatrixA));
  34. DataSet<Tuple2<Tuple3<Integer, Integer, Integer>, Integer>> matrixAB = keyValueMatrixA.union(keyValueMatrixB);
  35. DataSet<Tuple2<Tuple3<Integer, Integer, Integer>, Integer>> matrixAB_01 = matrixAB.groupBy(0)
  36. .reduce(new ProductReducer());
  37. DataSet<Tuple2<Tuple2<Integer, Integer>, Integer>> matrixAB_02 = matrixAB_01.map(new SumMapper());
  38. DataSet<Tuple2<Tuple2<Integer, Integer>, Integer>> productMatrixAB = matrixAB_02.groupBy(0)
  39. .reduce(new SumReducer());
  40. System.out.println("Matrix AB");
  41. productMatrixAB.print();
  42. // String executionPlan = env.getExecutionPlan();
  43. // System.out.println("ExecutionPlan ........................ ");
  44. System.out.println(productMatrixAB.getExecutionEnvironment().getExecutionPlan());
  45. // System.out.println("........................ ");
  46. }
  47. public static class MapMatrixToKeysAndValues implements
  48. MapPartitionFunction<Tuple4<String, Integer, Integer, Integer>, Tuple2<Tuple3<Integer, Integer, Integer>, Integer>> {
  49. private static final long serialVersionUID = 6992353073599144457L;
  50. private int count;
  51. public MapMatrixToKeysAndValues(int count) {
  52. this.count = count;
  53. }
  54. @Override
  55. public void mapPartition(Iterable<Tuple4<String, Integer, Integer, Integer>> values,
  56. Collector<Tuple2<Tuple3<Integer, Integer, Integer>, Integer>> out) throws Exception {
  57. for (Tuple4<String, Integer, Integer, Integer> tuple : values) {
  58. for (int c = 1; c <= count; c++) {
  59. Tuple3<Integer, Integer, Integer> key = null;
  60. Integer value = null;
  61. if (MATRIX_A.equals(tuple.f0)) {
  62. // key(i,k,i+j) for k=1...N
  63. Integer i = tuple.f1;
  64. Integer j = tuple.f2;
  65. Integer k = c;
  66. key = new Tuple3<Integer, Integer, Integer>(i, k, i + j);
  67. // value matrix[i,j]
  68. value = tuple.f3;
  69. } else if (MATRIX_B.equals(tuple.f0)) {
  70. // key(i,k,i+j) for i=1...L
  71. Integer i = c;
  72. Integer j = tuple.f1;
  73. Integer k = tuple.f2;
  74. key = new Tuple3<Integer, Integer, Integer>(i, k, i + j);
  75. // value matrix[j,k]
  76. value = tuple.f3;
  77. }
  78. out.collect(new Tuple2<Tuple3<Integer, Integer, Integer>, Integer>(key, value));
  79. }
  80. }
  81. }
  82. }
  83. public static class ProductReducer implements ReduceFunction<Tuple2<Tuple3<Integer, Integer, Integer>, Integer>> {
  84. private static final long serialVersionUID = 6166767956669902083L;
  85. @Override
  86. public Tuple2<Tuple3<Integer, Integer, Integer>, Integer> reduce(
  87. Tuple2<Tuple3<Integer, Integer, Integer>, Integer> value1,
  88. Tuple2<Tuple3<Integer, Integer, Integer>, Integer> value2) throws Exception {
  89. Integer product = null;
  90. product = value1.f1 * value2.f1;
  91. return new Tuple2<Tuple3<Integer, Integer, Integer>, Integer>(value1.f0, product);
  92. }
  93. }
  94. public static class SumMapper implements
  95. MapFunction<Tuple2<Tuple3<Integer, Integer, Integer>, Integer>, Tuple2<Tuple2<Integer, Integer>, Integer>> {
  96. private static final long serialVersionUID = -1437482917757334157L;
  97. @Override
  98. public Tuple2<Tuple2<Integer, Integer>, Integer> map(Tuple2<Tuple3<Integer, Integer, Integer>, Integer> value)
  99. throws Exception {
  100. Tuple2<Integer, Integer> key = new Tuple2<Integer, Integer>(value.f0.f0, value.f0.f1);
  101. return new Tuple2<Tuple2<Integer, Integer>, Integer>(key, value.f1);
  102. }
  103. }
  104. public static class SumReducer implements ReduceFunction<Tuple2<Tuple2<Integer, Integer>, Integer>> {
  105. private static final long serialVersionUID = 7849401047616065465L;
  106. @Override
  107. public Tuple2<Tuple2<Integer, Integer>, Integer> reduce(Tuple2<Tuple2<Integer, Integer>, Integer> value1,
  108. Tuple2<Tuple2<Integer, Integer>, Integer> value2) throws Exception {
  109. Tuple2<Integer, Integer> key = new Tuple2<Integer, Integer>(value1.f0.f0, value1.f0.f1);
  110. Integer value = value1.f1 + value2.f1;
  111. return new Tuple2<Tuple2<Integer, Integer>, Integer>(key, value);
  112. }
  113. }
  114. }

$cat资源/matrixa.csv

  1. 1,1,1
  2. 1,2,3
  3. 1,3,4
  4. 1,4,-2
  5. 2,1,6
  6. 2,2,2
  7. 2,3,-3
  8. 2,4,1

$cat资源/matrixb.csv

  1. 1,1,1
  2. 1,2,-2
  3. 2,1,4
  4. 2,2,3
  5. 3,1,-3
  6. 3,2,-2
  7. 4,1,0
  8. 4,2,4
ct3nt3jp

ct3nt3jp1#

如果您删除了 print() 电话。 print() 也会触发执行。

相关问题