【Flink】Flink源码分析——批处理模式JobGraph的创建

x33g5p2x  于2022-06-27 转载在 Flink  
字(57.6k)|赞(0)|评价(0)|浏览(752)

1.概述

转载:Flink源码分析——批处理模式JobGraph的创建 仅供自己学习。

Flink不管是流处理还是批处理都是将我们的程序编译成JobGraph进行提交的,之前我们分析过流处理模式下的JobGraph创建,现在我们来分析一下批处理模式下的JobGraph创建。

本文以本地模式为例,分析JobGraph的创建

我们仍然以WordCount为例子来分析JobGraph的创建过程,WordCount代码

  1. val env = ExecutionEnvironment.getExecutionEnvironment
  2. env.setParallelism(1)
  3. // env.getConfig.setExecutionMode(ExecutionMode.BATCH)
  4. val text = env.fromElements(
  5. "Who's there?",
  6. "I think I hear them. Stand, ho! Who's there?",
  7. "li wen tao li wen tao li wen tao"
  8. )
  9. text.flatMap { _.toLowerCase.split("\\W+").filter{ _.nonEmpty } }
  10. .map { (_, 1) }
  11. .groupBy(0)
  12. .sum(1)
  13. .writeAsText("D:\\IDEASPARK\\flink\\wordcount", WriteMode.OVERWRITE)
  14. env.execute()

这个WordCount执行之后生成的DataSet关系图如下所示:

  1. DataSource ——> FlatMapOperator ——> MapOperator ——> ScalaAggregateOperator ——> DataSink

注意这里的Operator并非是指算子层面的operator,而是在数据集层面的operator,这些operator也还是DataSet的子类型(DataSink除外)

首先看一下执行入口,在本地模式下,会执行LocalEnvironment.execute()方法,先创建执行计划Plan,再开始执行这个计划

  1. //LocalEnvironment
  2. public JobExecutionResult execute(String jobName) throws Exception {
  3. if (executor == null) {
  4. startNewSession();
  5. }
  6. Plan p = createProgramPlan(jobName);
  7. // Session management is disabled, revert this commit to enable
  8. //p.setJobId(jobID);
  9. //p.setSessionTimeout(sessionTimeout);
  10. JobExecutionResult result = executor.executePlan(p);
  11. this.lastJobExecutionResult = result;
  12. return result;
  13. }

这个执行计划Plan很简单,里面只包含了一些sinks,先创建执行计划的过程就是将WordCount代码中创建的每个DataSet转换成对应算子层面的operator。

2.创建执行计划Plan

首先我们来看看createProgramPlan()源码实现

  1. //ExecutionEnvironment
  2. public Plan createProgramPlan(String jobName, boolean clearSinks) {
  3. ...
  4. //创建一个translator转换器,从sink开始转换
  5. OperatorTranslation translator = new OperatorTranslation();
  6. Plan plan = translator.translateToPlan(this.sinks, jobName);
  7. ...
  8. return plan;
  9. }
  1. //OperatorTranslation
  2. public Plan translateToPlan(List<DataSink<?>> sinks, String jobName) {
  3. List<GenericDataSinkBase<?>> planSinks = new ArrayList<>();
  4. //从sink开始进行向上的深度优先遍历
  5. for (DataSink<?> sink : sinks) {
  6. planSinks.add(translate(sink));
  7. }
  8. Plan p = new Plan(planSinks);
  9. p.setJobName(jobName);
  10. return p;
  11. }
  12. private <T> GenericDataSinkBase<T> translate(DataSink<T> sink) {
  13. // translate the input recursively
  14. //从sink开始递归的向上去进行转换
  15. Operator<T> input = translate(sink.getDataSet());
  16. // translate the sink itself and connect it to the input
  17. GenericDataSinkBase<T> translatedSink = sink.translateToDataFlow(input);
  18. translatedSink.setResources(sink.getMinResources(), sink.getPreferredResources());
  19. return translatedSink;
  20. }
  21. private <T> Operator<T> translate(DataSet<T> dataSet) {
  22. while (dataSet instanceof NoOpOperator) {
  23. dataSet = ((NoOpOperator<T>) dataSet).getInput();
  24. }
  25. // check if we have already translated that data set (operation or source)
  26. Operator<?> previous = this.translated.get(dataSet);
  27. if (previous != null) {
  28. ... //已经转换过了
  29. }
  30. Operator<T> dataFlowOp;
  31. if (dataSet instanceof DataSource) {
  32. DataSource<T> dataSource = (DataSource<T>) dataSet;
  33. dataFlowOp = dataSource.translateToDataFlow();
  34. dataFlowOp.setResources(dataSource.getMinResources(), dataSource.getPreferredResources());
  35. }
  36. else if (dataSet instanceof SingleInputOperator) {
  37. SingleInputOperator<?, ?, ?> singleInputOperator = (SingleInputOperator<?, ?, ?>) dataSet;
  38. dataFlowOp = translateSingleInputOperator(singleInputOperator);
  39. dataFlowOp.setResources(singleInputOperator.getMinResources(), singleInputOperator.getPreferredResources());
  40. }
  41. else if (dataSet instanceof TwoInputOperator) {
  42. TwoInputOperator<?, ?, ?, ?> twoInputOperator = (TwoInputOperator<?, ?, ?, ?>) dataSet;
  43. dataFlowOp = translateTwoInputOperator(twoInputOperator);
  44. dataFlowOp.setResources(twoInputOperator.getMinResources(), twoInputOperator.getPreferredResources());
  45. }else if
  46. ...
  47. this.translated.put(dataSet, dataFlowOp);
  48. // take care of broadcast variables
  49. translateBcVariables(dataSet, dataFlowOp);
  50. return dataFlowOp;
  51. }
  52. private <I, O> org.apache.flink.api.common.operators.Operator<O> translateSingleInputOperator(SingleInputOperator<?, ?, ?> op) {
  53. @SuppressWarnings("unchecked")
  54. SingleInputOperator<I, O, ?> typedOp = (SingleInputOperator<I, O, ?>) op;
  55. @SuppressWarnings("unchecked")
  56. DataSet<I> typedInput = (DataSet<I>) op.getInput();
  57. //在遇到SingleInputOperator节点是继续向上递归,那么整个的递归过程就是从sink后续遍历,先转换source,再依次向下进行转换
  58. Operator<I> input = translate(typedInput);
  59. org.apache.flink.api.common.operators.Operator<O> dataFlowOp = typedOp.translateToDataFlow(input);
  60. ...
  61. return dataFlowOp;
  62. }

大致实现就是从sink开始进行向上递归的转换,整个的递归过程就是从sink进行深度优化遍历,先转换source,再依次向下进行转换,转换的方法就是调用每个DataSet(或者DataSink)的translateToDataFlow()方法,将DataSet转换成算子层面的Operator,然后将上一级转换后的Operator当做输入input。

下面看一下每种DataSet(或DataSink)的translateToDataFlow()方法

  1. //
  2. protected GenericDataSourceBase<OUT, ?> translateToDataFlow() {
  3. String name = this.name != null ? this.name : "at " + dataSourceLocationName + " (" + inputFormat.getClass().getName() + ")";
  4. if (name.length() > 150) {
  5. name = name.substring(0, 150);
  6. }
  7. @SuppressWarnings({"unchecked", "rawtypes"})
  8. GenericDataSourceBase<OUT, ?> source = new GenericDataSourceBase(this.inputFormat,
  9. new OperatorInformation<OUT>(getType()), name);
  10. source.setParallelism(parallelism);
  11. if (this.parameters != null) {
  12. source.getParameters().addAll(this.parameters);
  13. }
  14. if (this.splitDataProperties != null) {
  15. source.setSplitDataProperties(this.splitDataProperties);
  16. }
  17. return source;
  18. }
  19. //MapOperator
  20. protected MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
  21. String name = getName() != null ? getName() : "Map at " + defaultName;
  22. // create operator
  23. MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> po = new MapOperatorBase<IN, OUT, MapFunction<IN, OUT>>(function,
  24. new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
  25. // set input
  26. po.setInput(input);
  27. // set parallelism
  28. if (this.getParallelism() > 0) {
  29. // use specified parallelism
  30. po.setParallelism(this.getParallelism());
  31. } else {
  32. // if no parallelism has been specified, use parallelism of input operator to enable chaining
  33. po.setParallelism(input.getParallelism());
  34. }
  35. return po;
  36. }
  37. //ScalaAggregateOperator
  38. protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>> translateToDataFlow(Operator<IN> input) {
  39. // sanity check
  40. if (this.aggregationFunctions.isEmpty() || this.aggregationFunctions.size() != this.fields.size()) {
  41. throw new IllegalStateException();
  42. }
  43. // construct the aggregation function
  44. AggregationFunction<Object>[] aggFunctions = new AggregationFunction[this.aggregationFunctions.size()];
  45. int[] fields = new int[this.fields.size()];
  46. StringBuilder genName = new StringBuilder();
  47. for (int i = 0; i < fields.length; i++) {
  48. aggFunctions[i] = (AggregationFunction<Object>) this.aggregationFunctions.get(i);
  49. fields[i] = this.fields.get(i);
  50. genName.append(aggFunctions[i].toString()).append('(').append(fields[i]).append(')').append(',');
  51. }
  52. genName.setLength(genName.length() - 1);
  53. @SuppressWarnings("rawtypes")
  54. RichGroupReduceFunction<IN, IN> function = new AggregatingUdf(getInputType(), aggFunctions, fields);
  55. String name = getName() != null ? getName() : genName.toString();
  56. // distinguish between grouped reduce and non-grouped reduce
  57. //这种是针对未分组的reduce
  58. if (this.grouping == null) {
  59. // non grouped aggregation
  60. UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType());
  61. GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>> po =
  62. new GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>>(function, operatorInfo, new int[0], name);
  63. po.setCombinable(true);
  64. // set input
  65. po.setInput(input);
  66. // set parallelism
  67. po.setParallelism(this.getParallelism());
  68. return po;
  69. }
  70. //这种是针对的是分组的reduce,我们的WordCount代码走这里
  71. if (this.grouping.getKeys() instanceof Keys.ExpressionKeys) {
  72. // grouped aggregation
  73. int[] logicalKeyPositions = this.grouping.getKeys().computeLogicalKeyPositions();
  74. UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType());
  75. GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>> po =
  76. new GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>>(function, operatorInfo, logicalKeyPositions, name);
  77. //默认就开启combiner了,数据预先进行聚合,减少数据传输
  78. po.setCombinable(true);
  79. // set input
  80. po.setInput(input);
  81. // set parallelism
  82. po.setParallelism(this.getParallelism());
  83. SingleInputSemanticProperties props = new SingleInputSemanticProperties();
  84. for (int keyField : logicalKeyPositions) {
  85. boolean keyFieldUsedInAgg = false;
  86. for (int aggField : fields) {
  87. if (keyField == aggField) {
  88. keyFieldUsedInAgg = true;
  89. break;
  90. }
  91. }
  92. if (!keyFieldUsedInAgg) {
  93. props.addForwardedField(keyField, keyField);
  94. }
  95. }
  96. po.setSemanticProperties(props);
  97. po.setCustomPartitioner(grouping.getCustomPartitioner());
  98. return po;
  99. }
  100. else if (this.grouping.getKeys() instanceof Keys.SelectorFunctionKeys) {
  101. throw new UnsupportedOperationException("Aggregate does not support grouping with KeySelector functions, yet.");
  102. }
  103. else {
  104. throw new UnsupportedOperationException("Unrecognized key type.");
  105. }
  106. }
  107. //DataSink
  108. protected GenericDataSinkBase<T> translateToDataFlow(Operator<T> input) {
  109. // select the name (or create a default one)
  110. String name = this.name != null ? this.name : this.format.toString();
  111. GenericDataSinkBase<T> sink = new GenericDataSinkBase<>(this.format, new UnaryOperatorInformation<>(this.type, new NothingTypeInfo()), name);
  112. // set input
  113. sink.setInput(input);
  114. // set parameters
  115. if (this.parameters != null) {
  116. sink.getParameters().addAll(this.parameters);
  117. }
  118. // set parallelism
  119. if (this.parallelism > 0) {
  120. // use specified parallelism
  121. sink.setParallelism(this.parallelism);
  122. } else {
  123. // if no parallelism has been specified, use parallelism of input operator to enable chaining
  124. sink.setParallelism(input.getParallelism());
  125. }
  126. if (this.sortKeyPositions != null) {
  127. // configure output sorting
  128. Ordering ordering = new Ordering();
  129. for (int i = 0; i < this.sortKeyPositions.length; i++) {
  130. ordering.appendOrdering(this.sortKeyPositions[i], null, this.sortOrders[i]);
  131. }
  132. sink.setLocalOrder(ordering);
  133. }
  134. return sink;
  135. }

经过转换,上述WordCount转换成的算子层面的Operator就如下所示:

  1. GenericDataSourceBase --> FlatMapOperatorBase --> MapOperatorBase --> GroupReduceOperatorBase --> GenericDataSinkBase

上级operator作为下级operator的input,这样一级一级的进行链接起来。

3.编译成OptimizedPlan

接下来就到了执行这个计划的代码了,也就是executor.executePlan§,关于JobGraph的实现大致如下:

  1. 创建一个优化器,对Plan进行优化,编译成OptimizedPlan
  2. 创建JobGraph生成器,再对OptimizedPlan进行编译成JobGraph
  1. public JobExecutionResult executePlan(Plan plan) throws Exception {
  2. if (plan == null) {
  3. throw new IllegalArgumentException("The plan may not be null.");
  4. }
  5. synchronized (this.lock) {
  6. ... //启动本地集群环境
  7. try {
  8. // TODO: Set job's default parallelism to max number of slots
  9. final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
  10. final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
  11. plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
  12. //下面几行代码是JobGraph创建的关键过程
  13. Optimizer pc = new Optimizer(new DataStatistics(), jobExecutorServiceConfiguration);
  14. OptimizedPlan op = pc.compile(plan);
  15. JobGraphGenerator jgg = new JobGraphGenerator(jobExecutorServiceConfiguration);
  16. JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId());
  17. return jobExecutorService.executeJobBlocking(jobGraph);
  18. }
  19. finally {
  20. if (shutDownAtEnd) {
  21. stop();
  22. }
  23. }
  24. }
  25. }

4.优化器Optimizer

优化器Optimizer对原始计划进行编译,编译的过程大致实现如下:

  1. 创建GraphCreatingVisitor,对原始的Plan进行优化,将每个operator优化成OptimizerNodeOptimizerNode之间通过DagConnection相连,DagConnection相当于一个边模型,有sourcetarget,可以表示OptimizerNode的输入和输出
  2. OptimizerNode再进行优化,将每个OptimizerNode优化成PlanNodePlanNode之间通过Channel相连,Channel也相当于是一个边模型,可以表示PlanNode的输入和输出。这个过程会做很多优化,比如对GroupReduceNode会增加combiner的节点,对Channel会设置ShipStrategyType和DataExchangeMode,ShipStrategyType表示的两个节点之间数据的传输策略,比如进行hash分区范围分区等,DataExchangeMode表示的是两个节点间数据交换的模式,有PIPELINEDBATCH
  1. //Optimizer类
  2. public OptimizedPlan compile(Plan program) throws CompilerException {
  3. final OptimizerPostPass postPasser = getPostPassFromPlan(program);
  4. return compile(program, postPasser);
  5. }
  6. private OptimizedPlan compile(Plan program, OptimizerPostPass postPasser) throws CompilerException {
  7. ...
  8. final ExecutionMode defaultDataExchangeMode = program.getExecutionConfig().getExecutionMode();
  9. final int defaultParallelism = program.getDefaultParallelism() > 0 ?
  10. program.getDefaultParallelism() : this.defaultParallelism;
  11. ...
  12. //对原始的Plan进行优化,将每个operator优化成OptimizerNode
  13. GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(defaultParallelism, defaultDataExchangeMode);
  14. program.accept(graphCreator);
  15. // if we have a plan with multiple data sinks, add logical optimizer nodes that have two data-sinks as children
  16. // each until we have only a single root node. This allows to transparently deal with the nodes with
  17. // multiple outputs
  18. OptimizerNode rootNode;
  19. if (graphCreator.getSinks().size() == 1) {
  20. rootNode = graphCreator.getSinks().get(0);
  21. }
  22. else if (graphCreator.getSinks().size() > 1) {
  23. Iterator<DataSinkNode> iter = graphCreator.getSinks().iterator();
  24. rootNode = iter.next();
  25. while (iter.hasNext()) {
  26. rootNode = new SinkJoiner(rootNode, iter.next());
  27. }
  28. }
  29. else {
  30. throw new CompilerException("Bug: The optimizer plan representation has no sinks.");
  31. }
  32. // now that we have all nodes created and recorded which ones consume memory, tell the nodes their minimal
  33. // guaranteed memory, for further cost estimations. We assume an equal distribution of memory among consumer tasks
  34. rootNode.accept(new IdAndEstimatesVisitor(this.statistics));
  35. // We need to enforce that union nodes always forward their output to their successor.
  36. // Any partitioning must be either pushed before or done after the union, but not on the union's output.
  37. UnionParallelismAndForwardEnforcer unionEnforcer = new UnionParallelismAndForwardEnforcer();
  38. rootNode.accept(unionEnforcer);
  39. // We are dealing with operator DAGs, rather than operator trees.
  40. // That requires us to deviate at some points from the classical DB optimizer algorithms.
  41. // This step builds auxiliary structures to help track branches and joins in the DAG
  42. BranchesVisitor branchingVisitor = new BranchesVisitor();
  43. rootNode.accept(branchingVisitor);
  44. // Propagate the interesting properties top-down through the graph
  45. InterestingPropertyVisitor propsVisitor = new InterestingPropertyVisitor(this.costEstimator);
  46. rootNode.accept(propsVisitor);
  47. // perform a sanity check: the root may not have any unclosed branches
  48. if (rootNode.getOpenBranches() != null && rootNode.getOpenBranches().size() > 0) {
  49. throw new CompilerException("Bug: Logic for branching plans (non-tree plans) has an error, and does not " +
  50. "track the re-joining of branches correctly.");
  51. }
  52. // the final step is now to generate the actual plan alternatives
  53. //对OptimizerNode再进行优化,对每个OptimizerNode优化成PlanNode
  54. List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);
  55. if (bestPlan.size() != 1) {
  56. throw new CompilerException("Error in compiler: more than one best plan was created!");
  57. }
  58. // check if the best plan's root is a data sink (single sink plan)
  59. // if so, directly take it. if it is a sink joiner node, get its contained sinks
  60. PlanNode bestPlanRoot = bestPlan.get(0);
  61. List<SinkPlanNode> bestPlanSinks = new ArrayList<SinkPlanNode>(4);
  62. if (bestPlanRoot instanceof SinkPlanNode) {
  63. bestPlanSinks.add((SinkPlanNode) bestPlanRoot);
  64. } else if (bestPlanRoot instanceof SinkJoinerPlanNode) {
  65. ((SinkJoinerPlanNode) bestPlanRoot).getDataSinks(bestPlanSinks);
  66. }
  67. // finalize the plan
  68. //创建最终的优化过的计划OptimizedPlan
  69. OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program);
  70. plan.accept(new BinaryUnionReplacer());
  71. plan.accept(new RangePartitionRewriter(plan));
  72. // post pass the plan. this is the phase where the serialization and comparator code is set
  73. postPasser.postPass(plan);
  74. return plan;
  75. }

5.将Operator转换成OptimizerNode

GraphCreatingVisitor对原始Plan进行优化成OptimizerNode

首先我们来看看原始Plan进行优化成OptimizerNode的过程,代码实现在program.accept(graphCreator)

  1. //Plan
  2. public void accept(Visitor<Operator<?>> visitor) {
  3. for (GenericDataSinkBase<?> sink : this.sinks) {
  4. sink.accept(visitor);
  5. }
  6. }
  7. //GenericDataSinkBase
  8. public void accept(Visitor<Operator<?>> visitor) {
  9. boolean descend = visitor.preVisit(this);
  10. if (descend) {
  11. this.input.accept(visitor);
  12. visitor.postVisit(this);
  13. }
  14. }
  15. //SingleInputOperator
  16. public void accept(Visitor<Operator<?>> visitor) {
  17. if (visitor.preVisit(this)) {
  18. this.input.accept(visitor);
  19. for (Operator<?> c : this.broadcastInputs.values()) {
  20. c.accept(visitor);
  21. }
  22. visitor.postVisit(this);
  23. }
  24. }
  25. //GenericDataSourceBase
  26. public void accept(Visitor<Operator<?>> visitor) {
  27. if (visitor.preVisit(this)) {
  28. visitor.postVisit(this);
  29. }
  30. }

从代码中可以看到,整个accept()过程就是一个递归遍历的过程,有点类似于中序遍历的过程。先从sink(GenericDataSinkBase)开始,由下至上对每个operator执行visitor.preVisit()方法,再由上至下对每个operator执行visitor.postVisit()。

既然核心方法在visitor.preVisit()和visitor.postVisit(),那我们就来看看GraphCreatingVisitor的这两个方法。

preVisit()

  1. public boolean preVisit(Operator<?> c) {
  2. // check if we have been here before
  3. if (this.con2node.containsKey(c)) {
  4. return false;
  5. }
  6. final OptimizerNode n;
  7. // create a node for the operator (or sink or source) if we have not been here before
  8. if (c instanceof GenericDataSinkBase) {
  9. DataSinkNode dsn = new DataSinkNode((GenericDataSinkBase<?>) c);
  10. this.sinks.add(dsn);
  11. n = dsn;
  12. }
  13. else if (c instanceof GenericDataSourceBase) {
  14. n = new DataSourceNode((GenericDataSourceBase<?, ?>) c);
  15. }
  16. else if (c instanceof MapOperatorBase) {
  17. n = new MapNode((MapOperatorBase<?, ?, ?>) c);
  18. }
  19. else if (c instanceof MapPartitionOperatorBase) {
  20. n = new MapPartitionNode((MapPartitionOperatorBase<?, ?, ?>) c);
  21. }
  22. else if (c instanceof FlatMapOperatorBase) {
  23. n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c);
  24. }
  25. else if (c instanceof FilterOperatorBase) {
  26. n = new FilterNode((FilterOperatorBase<?, ?>) c);
  27. }
  28. else if (c instanceof ReduceOperatorBase) {
  29. n = new ReduceNode((ReduceOperatorBase<?, ?>) c);
  30. }
  31. else if (c instanceof GroupCombineOperatorBase) {
  32. n = new GroupCombineNode((GroupCombineOperatorBase<?, ?, ?>) c);
  33. }
  34. else if (c instanceof GroupReduceOperatorBase) {
  35. n = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) c);
  36. }
  37. else if (c instanceof InnerJoinOperatorBase) {
  38. n = new JoinNode((InnerJoinOperatorBase<?, ?, ?, ?>) c);
  39. }
  40. else if (c instanceof OuterJoinOperatorBase) {
  41. n = new OuterJoinNode((OuterJoinOperatorBase<?, ?, ?, ?>) c);
  42. }
  43. else if (c instanceof CoGroupOperatorBase) {
  44. n = new CoGroupNode((CoGroupOperatorBase<?, ?, ?, ?>) c);
  45. }
  46. else if (c instanceof CoGroupRawOperatorBase) {
  47. n = new CoGroupRawNode((CoGroupRawOperatorBase<?, ?, ?, ?>) c);
  48. }
  49. else if (c instanceof CrossOperatorBase) {
  50. n = new CrossNode((CrossOperatorBase<?, ?, ?, ?>) c);
  51. }
  52. else if (c instanceof BulkIterationBase) {
  53. n = new BulkIterationNode((BulkIterationBase<?>) c);
  54. }
  55. else if (c instanceof DeltaIterationBase) {
  56. n = new WorksetIterationNode((DeltaIterationBase<?, ?>) c);
  57. }
  58. else if (c instanceof Union){
  59. n = new BinaryUnionNode((Union<?>) c);
  60. }
  61. else if (c instanceof PartitionOperatorBase) {
  62. n = new PartitionNode((PartitionOperatorBase<?>) c);
  63. }
  64. else if (c instanceof SortPartitionOperatorBase) {
  65. n = new SortPartitionNode((SortPartitionOperatorBase<?>) c);
  66. }
  67. else if (c instanceof BulkIterationBase.PartialSolutionPlaceHolder) {
  68. ...
  69. }
  70. else if (c instanceof DeltaIterationBase.WorksetPlaceHolder) {
  71. ...
  72. }
  73. else if (c instanceof DeltaIterationBase.SolutionSetPlaceHolder) {
  74. ...
  75. }
  76. else {
  77. throw new IllegalArgumentException("Unknown operator type: " + c);
  78. }
  79. this.con2node.put(c, n);
  80. // set the parallelism only if it has not been set before. some nodes have a fixed parallelism, such as the
  81. // key-less reducer (all-reduce)
  82. if (n.getParallelism() < 1) {
  83. // set the parallelism
  84. int par = c.getParallelism();
  85. if (n instanceof BinaryUnionNode) {
  86. // Keep parallelism of union undefined for now.
  87. // It will be determined based on the parallelism of its successor.
  88. par = -1;
  89. } else if (par > 0) {
  90. if (this.forceParallelism && par != this.defaultParallelism) {
  91. par = this.defaultParallelism;
  92. Optimizer.LOG.warn("The parallelism of nested dataflows (such as step functions in iterations) is " +
  93. "currently fixed to the parallelism of the surrounding operator (the iteration).");
  94. }
  95. } else {
  96. par = this.defaultParallelism;
  97. }
  98. n.setParallelism(par);
  99. }
  100. return true;
  101. }

preVisit()方法非常简单,仅仅是判断输入Operator的类型,来创建对应的OptimizerNode,然后设置并行度

postVisit()

  1. public void postVisit(Operator<?> c) {
  2. OptimizerNode n = this.con2node.get(c);
  3. // first connect to the predecessors
  4. n.setInput(this.con2node, this.defaultDataExchangeMode);
  5. n.setBroadcastInputs(this.con2node, this.defaultDataExchangeMode);
  6. // if the node represents a bulk iteration, we recursively translate the data flow now
  7. if (n instanceof BulkIterationNode) {
  8. ...
  9. }
  10. else if (n instanceof WorksetIterationNode) {
  11. ...
  12. }
  13. }

postVisit()方法也很简单,就是对每个Operator对应的OptimizerNode设置input。defaultDataExchangeMode在这里默认就是ExecutionMode.PIPELINED,也可以通过env.getConfig.setExecutionMode(ExecutionMode.BATCH)来进行设置默认的ExecutionMode。ExecutionMode表示的是两个节点间数据交换的模式,有PIPELINEDBATCH

  1. PIPELINED模式数据像流水线一样的进行传输,上游任务和下游任务能够同时进行生产和消费数据;
  2. BATCH模式需要等上游的任务数据全部处理完之后才会开始下游的任务,中间数据会spill到磁盘上。

下面看看每种OptimizerNode的setInput()方法

  1. //DataSourceNode
  2. public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultDataExchangeMode) {}
  3. //SingleInputNode
  4. public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExchangeMode)
  5. throws CompilerException
  6. {
  7. // see if an internal hint dictates the strategy to use
  8. final Configuration conf = getOperator().getParameters();
  9. final String shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY, null);
  10. final ShipStrategyType preSet;
  11. //默认情况下这里都是null
  12. if (shipStrategy != null) {
  13. if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH)) {
  14. preSet = ShipStrategyType.PARTITION_HASH;
  15. } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE)) {
  16. preSet = ShipStrategyType.PARTITION_RANGE;
  17. } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_FORWARD)) {
  18. preSet = ShipStrategyType.FORWARD;
  19. } else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
  20. preSet = ShipStrategyType.PARTITION_RANDOM;
  21. } else {
  22. throw new CompilerException("Unrecognized ship strategy hint: " + shipStrategy);
  23. }
  24. } else {
  25. preSet = null;
  26. }
  27. // get the predecessor node
  28. Operator<?> children = ((SingleInputOperator<?, ?, ?>) getOperator()).getInput();
  29. OptimizerNode pred;
  30. DagConnection conn;
  31. if (children == null) {
  32. throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input.");
  33. } else {
  34. pred = contractToNode.get(children);
  35. conn = new DagConnection(pred, this, defaultExchangeMode);
  36. if (preSet != null) {
  37. conn.setShipStrategy(preSet);
  38. }
  39. }
  40. // create the connection and add it
  41. setIncomingConnection(conn);
  42. pred.addOutgoingConnection(conn);
  43. }
  44. //DataSinkNode
  45. public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExchangeMode) {
  46. Operator<?> children = getOperator().getInput();
  47. final OptimizerNode pred;
  48. final DagConnection conn;
  49. pred = contractToNode.get(children);
  50. conn = new DagConnection(pred, this, defaultExchangeMode);
  51. // create the connection and add it
  52. this.input = conn;
  53. pred.addOutgoingConnection(conn);
  54. }

setInput()方法就是创建了DagConnectionOptimizerNode连接在了一起。这个DagConnection就是一个边的模型,作为下游节点OptimizerNode的输入,同时作为上游节点OptimizerNode的输出。这里的DagConnection里的ShipStrategyExecutionMode还都是默认情况下的,不是最终的状态。

简单看一下DagConnection的结构

  1. public class DagConnection implements EstimateProvider, DumpableConnection<OptimizerNode> {
  2. private final OptimizerNode source; // The source node of the connection
  3. private final OptimizerNode target; // The target node of the connection.
  4. private final ExecutionMode dataExchangeMode; // defines whether to use batch or pipelined data exchange
  5. private InterestingProperties interestingProps; // local properties that succeeding nodes are interested in
  6. private ShipStrategyType shipStrategy; // The data shipping strategy, if predefined.
  7. private TempMode materializationMode = TempMode.NONE; // the materialization mode
  8. private int maxDepth = -1;
  9. private boolean breakPipeline; // whet

这样,经过优化器初步的优化,WordCount整个计划变成了如下的拓扑结构:

  1. DataSourceNode --> FlatMapNode --> MapNode --> GroupReduceNode --> DataSinkNode

每个OptimizerNode之间通过DagConnection进行连接

6.将OptimizerNode进一步优化成PlanNode

接下来是进一步的优化,将OptimizerNode优化成PlanNodePlanNode是最终的优化节点类型,它包含了节点的更多属性,节点之间通过Channel进行连接,Channel也是一种边模型,同时确定了节点之间的数据交换方式ShipStrategyTypeDataExchangeModeShipStrategyType表示的两个节点之间数据的传输策略,比如是否进行数据分区,进行hash分区,范围分区等; DataExchangeMode表示的是两个节点间数据交换的模式,有PIPELINEDBATCH,和ExecutionMode是一样的,ExecutionMode决定了DataExchangeMode。

代码实现在rootNode.getAlternativePlans(),这个rootNode也就是DataSinkNode

  1. public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
  2. // check if we have a cached version
  3. if (this.cachedPlans != null) {
  4. return this.cachedPlans;
  5. }
  6. // calculate alternative sub-plans for predecessor
  7. //递归的向上创建PlanNode,再创建当前节点,后序遍历
  8. List<? extends PlanNode> subPlans = getPredecessorNode().getAlternativePlans(estimator);
  9. List<PlanNode> outputPlans = new ArrayList<PlanNode>();
  10. final int parallelism = getParallelism();
  11. final int inDop = getPredecessorNode().getParallelism();
  12. final ExecutionMode executionMode = this.input.getDataExchangeMode();
  13. final boolean dopChange = parallelism != inDop;
  14. final boolean breakPipeline = this.input.isBreakingPipeline();
  15. InterestingProperties ips = this.input.getInterestingProperties();
  16. for (PlanNode p : subPlans) {
  17. for (RequestedGlobalProperties gp : ips.getGlobalProperties()) {
  18. for (RequestedLocalProperties lp : ips.getLocalProperties()) {
  19. //创建Channel,并对channel进行参数化赋值
  20. Channel c = new Channel(p);
  21. gp.parameterizeChannel(c, dopChange, executionMode, breakPipeline);
  22. lp.parameterizeChannel(c);
  23. c.setRequiredLocalProps(lp);
  24. c.setRequiredGlobalProps(gp);
  25. // no need to check whether the created properties meet what we need in case
  26. // of ordering or global ordering, because the only interesting properties we have
  27. // are what we require
  28. //创建一个SinkPlanNode,channel作为SinkPlanNode的输入
  29. outputPlans.add(new SinkPlanNode(this, "DataSink ("+this.getOperator().getName()+")" ,c));
  30. }
  31. }
  32. }
  33. // cost and prune the plans
  34. for (PlanNode node : outputPlans) {
  35. estimator.costOperator(node);
  36. }
  37. prunePlanAlternatives(outputPlans);
  38. this.cachedPlans = outputPlans;
  39. return outputPlans;
  40. }

从这个代码看到,getAlternativePlans()又是一个递归的遍历,是后续递归遍历,从上(source)到下(sink)的去创建PlanNode和Channel。getAlternativePlans()的核心就是创建PlanNode和Channel

Channel的数据结构如下,比较重要的两个参数就是ShipStrategyType和DataExchangeMode

  1. public class Channel implements EstimateProvider, Cloneable, DumpableConnection<PlanNode> {
  2. private PlanNode source;
  3. private PlanNode target;
  4. private ShipStrategyType shipStrategy = ShipStrategyType.NONE;
  5. private DataExchangeMode dataExchangeMode;
  6. private LocalStrategy localStrategy = LocalStrategy.NONE;
  7. private FieldList shipKeys;
  8. private FieldList localKeys;
  9. private boolean[] shipSortOrder;
  10. private boolean[] localSortOrder;

我们先来分析一下sink端创建Channel,并对channel进行参数化赋值的过程,重点在RequestedGlobalProperties.parameterizeChannel()方法。parameterizeChannel()方法就是给Channel设置ShipStrategyType和DataExchangeMode

  1. public void parameterizeChannel(Channel channel, boolean globalDopChange,
  2. ExecutionMode exchangeMode, boolean breakPipeline) {
  3. ...
  4. // if we request nothing, then we need no special strategy. forward, if the number of instances remains
  5. // the same, randomly repartition otherwise
  6. //这些一般对应了MapNode、FilterNode等
  7. if (isTrivial() || this.partitioning == PartitioningProperty.ANY_DISTRIBUTION) {
  8. ShipStrategyType shipStrategy = globalDopChange ? ShipStrategyType.PARTITION_RANDOM :
  9. ShipStrategyType.FORWARD;
  10. DataExchangeMode em = DataExchangeMode.select(exchangeMode, shipStrategy, breakPipeline);
  11. channel.setShipStrategy(shipStrategy, em);
  12. return;
  13. }
  14. final GlobalProperties inGlobals = channel.getSource().getGlobalProperties();
  15. // if we have no global parallelism change, check if we have already compatible global properties
  16. //DataSinkNode、GroupCombineNode会走这里
  17. if (!globalDopChange && isMetBy(inGlobals)) {
  18. DataExchangeMode em = DataExchangeMode.select(exchangeMode, ShipStrategyType.FORWARD, breakPipeline);
  19. channel.setShipStrategy(ShipStrategyType.FORWARD, em);
  20. return;
  21. }
  22. // if we fall through the conditions until here, we need to re-establish
  23. ShipStrategyType shipType;
  24. FieldList partitionKeys;
  25. boolean[] sortDirection;
  26. Partitioner<?> partitioner;
  27. switch (this.partitioning) {
  28. case FULL_REPLICATION:
  29. shipType = ShipStrategyType.BROADCAST;
  30. partitionKeys = null;
  31. sortDirection = null;
  32. partitioner = null;
  33. break;
  34. case ANY_PARTITIONING:
  35. //如果是ANY_PARTITIONING就直接执行HASH_PARTITIONED的步骤了,GroupReduceNode会走这里
  36. case HASH_PARTITIONED:
  37. shipType = ShipStrategyType.PARTITION_HASH;
  38. partitionKeys = Utils.createOrderedFromSet(this.partitioningFields);
  39. sortDirection = null;
  40. partitioner = null;
  41. break;
  42. case RANGE_PARTITIONED:
  43. shipType = ShipStrategyType.PARTITION_RANGE;
  44. partitionKeys = this.ordering.getInvolvedIndexes();
  45. sortDirection = this.ordering.getFieldSortDirections();
  46. partitioner = null;
  47. if (this.dataDistribution != null) {
  48. channel.setDataDistribution(this.dataDistribution);
  49. }
  50. break;
  51. case FORCED_REBALANCED:
  52. shipType = ShipStrategyType.PARTITION_FORCED_REBALANCE;
  53. partitionKeys = null;
  54. sortDirection = null;
  55. partitioner = null;
  56. break;
  57. case CUSTOM_PARTITIONING:
  58. shipType = ShipStrategyType.PARTITION_CUSTOM;
  59. partitionKeys = Utils.createOrderedFromSet(this.partitioningFields);
  60. sortDirection = null;
  61. partitioner = this.customPartitioner;
  62. break;
  63. default:
  64. throw new CompilerException("Invalid partitioning to create through a data exchange: "
  65. + this.partitioning.name());
  66. }
  67. DataExchangeMode exMode = DataExchangeMode.select(exchangeMode, shipType, breakPipeline);
  68. channel.setShipStrategy(shipType, partitionKeys, sortDirection, partitioner, exMode);
  69. }

通过代码可以看到,ChannelShipStrategyTypeDataExchangeMode跟当前节点的partitioning属性和程序设置的ExecutionMode模式有关。对于像MapNode、FilterNode、FlatMapNode的partitioning属性为PartitioningProperty.ANY_DISTRIBUTION,GroupCombineNode、DataSinkNode它的partitioning是PartitioningProperty.RANDOM_PARTITIONED,GroupReduceNode它的partitioning是PartitioningProperty.ANY_PARTITIONING。

这种情况下MapNode、FilterNode、FlatMapNode、GroupCombineNode、DataSinkNode的ShipStrategyType都是FORWARD,GroupReduceNode的ShipStrategyType是PARTITION_HASH

具体DataExchangeMode的选择代码如下,可以看到,即使我们设置了ExecutionMode,最终的DataExchangeMode也不一定就和ExecutionMode一样,它还跟ShipStrategyType有关,比如DataSink,即使我们设置了ExecutionMode=BATCH,最终DataExchangeMode也还是PIPELINED

  1. //DataExchangeMode
  2. public static DataExchangeMode select(ExecutionMode executionMode, ShipStrategyType shipStrategy,
  3. boolean breakPipeline) {
  4. if (shipStrategy == null || shipStrategy == ShipStrategyType.NONE) {
  5. throw new IllegalArgumentException("shipStrategy may not be null or NONE");
  6. }
  7. if (executionMode == null) {
  8. throw new IllegalArgumentException("executionMode may not mbe null");
  9. }
  10. if (breakPipeline) {
  11. return getPipelineBreakingExchange(executionMode);
  12. }
  13. else if (shipStrategy == ShipStrategyType.FORWARD) {
  14. return getForForwardExchange(executionMode);
  15. }
  16. else {
  17. return getForShuffleOrBroadcast(executionMode);
  18. }
  19. }
  20. public static DataExchangeMode getForForwardExchange(ExecutionMode mode) {
  21. return FORWARD[mode.ordinal()];
  22. }
  23. public static DataExchangeMode getForShuffleOrBroadcast(ExecutionMode mode) {
  24. return SHUFFLE[mode.ordinal()];
  25. }
  26. public static DataExchangeMode getPipelineBreakingExchange(ExecutionMode mode) {
  27. return BREAKING[mode.ordinal()];
  28. }
  29. private static final DataExchangeMode[] FORWARD = new DataExchangeMode[ExecutionMode.values().length];
  30. private static final DataExchangeMode[] SHUFFLE = new DataExchangeMode[ExecutionMode.values().length];
  31. private static final DataExchangeMode[] BREAKING = new DataExchangeMode[ExecutionMode.values().length];
  32. // initialize the map between execution modes and exchange modes in
  33. static {
  34. FORWARD[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED;
  35. SHUFFLE[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED;
  36. BREAKING[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED;
  37. FORWARD[ExecutionMode.PIPELINED.ordinal()] = PIPELINED;
  38. SHUFFLE[ExecutionMode.PIPELINED.ordinal()] = PIPELINED;
  39. BREAKING[ExecutionMode.PIPELINED.ordinal()] = BATCH;
  40. FORWARD[ExecutionMode.BATCH.ordinal()] = PIPELINED;
  41. SHUFFLE[ExecutionMode.BATCH.ordinal()] = BATCH;
  42. BREAKING[ExecutionMode.BATCH.ordinal()] = BATCH;
  43. FORWARD[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH;
  44. SHUFFLE[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH;
  45. BREAKING[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH;
  46. }

既然是递归向上调用,那我们再来看看SingleInputNode的getAlternativePlans()

  1. public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
  2. // check if we have a cached version
  3. if (this.cachedPlans != null) {
  4. return this.cachedPlans;
  5. }
  6. boolean childrenSkippedDueToReplicatedInput = false;
  7. // calculate alternative sub-plans for predecessor
  8. //也是向上递归的调用,先获取父节点对应的PlanNode
  9. final List<? extends PlanNode> subPlans = getPredecessorNode().getAlternativePlans(estimator);
  10. final Set<RequestedGlobalProperties> intGlobal = this.inConn.getInterestingProperties().getGlobalProperties();
  11. ...
  12. final ArrayList<PlanNode> outputPlans = new ArrayList<PlanNode>();
  13. final ExecutionMode executionMode = this.inConn.getDataExchangeMode();
  14. final int parallelism = getParallelism();
  15. final int inParallelism = getPredecessorNode().getParallelism();
  16. final boolean parallelismChange = inParallelism != parallelism;
  17. final boolean breaksPipeline = this.inConn.isBreakingPipeline();
  18. // create all candidates
  19. for (PlanNode child : subPlans) {
  20. ...
  21. if (this.inConn.getShipStrategy() == null) {
  22. // pick the strategy ourselves
  23. for (RequestedGlobalProperties igps: intGlobal) {
  24. //创建Channel并参数化
  25. final Channel c = new Channel(child, this.inConn.getMaterializationMode());
  26. igps.parameterizeChannel(c, parallelismChange, executionMode, breaksPipeline);
  27. ...
  28. for (RequestedGlobalProperties rgps: allValidGlobals) {
  29. if (rgps.isMetBy(c.getGlobalProperties())) {
  30. c.setRequiredGlobalProps(rgps);
  31. //创建当前节点对应的PlanNode,添加到outputPlans中
  32. addLocalCandidates(c, broadcastPlanChannels, igps, outputPlans, estimator);
  33. break;
  34. }
  35. }
  36. }
  37. } else {
  38. ...
  39. }
  40. }
  41. ...
  42. return outputPlans;
  43. }

前面都一样,都是在获取到父节点的PlanNode之后,创建Channel,给Channel设置ShipStrategyType和ExecutionMode。创建PlanNode的过程在addLocalCandidates()中,addLocalCandidates()最终都会调用每个SingleInputNode中OperatorDescriptorSingle.instantiate()方法。

  1. //MapDescriptor
  2. public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
  3. return new SingleInputPlanNode(node, "Map ("+node.getOperator().getName()+")", in, DriverStrategy.MAP);
  4. }
  5. //FlatMapDescriptor
  6. public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
  7. return new SingleInputPlanNode(node, "FlatMap ("+node.getOperator().getName()+")", in, DriverStrategy.FLAT_MAP);
  8. }
  9. //FilterDescriptor
  10. public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
  11. return new SingleInputPlanNode(node, "Filter ("+node.getOperator().getName()+")", in, DriverStrategy.FLAT_MAP);
  12. }

我们着重看的是GroupReduceNode节点创建PlanNode的过程:

  1. //GroupReduceWithCombineProperties
  2. public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
  3. if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
  4. if(in.getSource().getOptimizerNode() instanceof PartitionNode) {
  5. LOG.warn("Cannot automatically inject combiner for GroupReduceFunction. Please add an explicit combiner with combineGroup() in front of the partition operator.");
  6. }
  7. // adjust a sort (changes grouping, so it must be for this driver to combining sort
  8. if (in.getLocalStrategy() == LocalStrategy.SORT) {
  9. if (!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) {
  10. throw new RuntimeException("Bug: Inconsistent sort for group strategy.");
  11. }
  12. in.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(),
  13. in.getLocalStrategySortOrder());
  14. }
  15. return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", in,
  16. DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
  17. } else {
  18. // non forward case. all local properties are killed anyways, so we can safely plug in a combiner
  19. //再新建一个用于combiner的Channel,属性规定为ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED
  20. Channel toCombiner = new Channel(in.getSource());
  21. toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
  22. // create an input node for combine with same parallelism as input node
  23. GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode();
  24. combinerNode.setParallelism(in.getSource().getParallelism());
  25. //创建一个用于combiner的SingleInputPlanNode,它的父节点就是原GroupReduceNode的父节点
  26. SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine ("+node.getOperator()
  27. .getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE);
  28. combiner.setCosts(new Costs(0, 0));
  29. combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
  30. // set sorting comparator key info
  31. combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0);
  32. // set grouping comparator key info
  33. combiner.setDriverKeyInfo(this.keyList, 1);
  34. //创建一个reduce端的Channel
  35. Channel toReducer = new Channel(combiner);
  36. toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(),
  37. in.getShipStrategySortOrder(), in.getDataExchangeMode());
  38. if (in.getShipStrategy() == ShipStrategyType.PARTITION_RANGE) {
  39. toReducer.setDataDistribution(in.getDataDistribution());
  40. }
  41. toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(),
  42. in.getLocalStrategySortOrder());
  43. //创建GroupReduceNode节点对应SingleInputPlanNode
  44. return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")",
  45. toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
  46. }
  47. }

GroupReduceNode节点在创建PlanNode的过程中会创建两个PlanNode,一个PlanNode(GroupCombine)对应combiner过程,一个PlanNode(GroupReduce)对应reduce过程

最后我们再看source节点的getAlternativePlans(),过程比较简单,创建了SourcePlanNode节点,因为source没有输入,所有没有创建Channel的过程

  1. public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
  2. if (this.cachedPlans != null) {
  3. return this.cachedPlans;
  4. }
  5. SourcePlanNode candidate = new SourcePlanNode(this, "DataSource ("+this.getOperator().getName()+")",
  6. this.gprops, this.lprops);
  7. ...
  8. // since there is only a single plan for the data-source, return a list with that element only
  9. List<PlanNode> plans = new ArrayList<PlanNode>(1);
  10. plans.add(candidate);
  11. this.cachedPlans = plans;
  12. return plans;
  13. }

经过getAlternativePlans()方法执行完,所有的PlanNode都已经创建了。此时的WordCount拓扑结果图如下:

  1. SourcePlanNode --> SingleInputPlanNode(FlatMap) --> SingleInputPlanNode(Map) --> SingleInputPlanNode(GroupCombine) --> SingleInputPlanNode(GroupReduce) --> SinkPlanNode

各个PlanNode通过Channel进行链接。Channel描述了两个节点之间数据交换的方式和分区方式等属性

7.封装成OptimizedPlan

在上述所有的PlanNode都创建完毕后,就将其封装成OptimizedPlan。源码在PlanFinalizer.createFinalPlan()。其大致的实现就是将节点添加到sources、sinks、allNodes中,还可能会为每个节点设置任务占用的内存等

  1. public OptimizedPlan createFinalPlan(List<SinkPlanNode> sinks, String jobName, Plan originalPlan) {
  2. this.memoryConsumerWeights = 0;
  3. // traverse the graph
  4. for (SinkPlanNode node : sinks) {
  5. node.accept(this);
  6. }
  7. // assign the memory to each node
  8. ...
  9. return new OptimizedPlan(this.sources, this.sinks, this.allNodes, jobName, originalPlan);
  10. }
  11. public boolean preVisit(PlanNode visitable) {
  12. // if we come here again, prevent a further descend
  13. if (!this.allNodes.add(visitable)) {
  14. return false;
  15. }
  16. if (visitable instanceof SinkPlanNode) {
  17. this.sinks.add((SinkPlanNode) visitable);
  18. }
  19. else if (visitable instanceof SourcePlanNode) {
  20. this.sources.add((SourcePlanNode) visitable);
  21. }
  22. ...
  23. // double-connect the connections. previously, only parents knew their children, because
  24. // one child candidate could have been referenced by multiple parents.
  25. for (Channel conn : visitable.getInputs()) {
  26. conn.setTarget(visitable);
  27. conn.getSource().addOutgoingChannel(conn);
  28. }
  29. for (Channel c : visitable.getBroadcastInputs()) {
  30. c.setTarget(visitable);
  31. c.getSource().addOutgoingChannel(c);
  32. }
  33. // count the memory consumption
  34. ...
  35. return true;
  36. }
  37. @Override
  38. public void postVisit(PlanNode visitable) {}

到此,执行计划就编译完成了。下一步就是根据这个执行计划来生成JobGraph了。

8.创建JobGraph

JobGraph的创建在JobGraphGenerator.compileJobGraph()方法。核心方法在OptimizedPlan.accept()方法中。该方法会创建JobGraph的所有顶点、边、中间结果集,即JobVertex、JobEdge、IntermediateDataSet

  1. Optimizer pc = new Optimizer(new DataStatistics(), jobExecutorServiceConfiguration);
  2. OptimizedPlan op = pc.compile(plan);
  3. JobGraphGenerator jgg = new JobGraphGenerator(jobExecutorServiceConfiguration);
  4. JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId());

大致的实现步骤如下:

  1. 核心方法在program.accept()中,这个过程会调用JobGraphGenerator.preVisit()和JobGraphGenerator.postVisit()方法。preVisit()会创建JobVertex,postVisit()会将JobVertex进行连接,创建JobEdge、中间结果集IntermediateDataSet
  2. 将所有需要chain的节点信息添加到它属于的JobVertex的配置中
  3. 创建JobGraph实例,将步骤1中创建的所有的JobVertex添加到JobGraph中,返回这个实例
  1. //JobGraphGenerator类
  2. public JobGraph compileJobGraph(OptimizedPlan program, JobID jobId) {
  3. if (program == null) {
  4. throw new NullPointerException("Program is null, did you called " +
  5. "ExecutionEnvironment.execute()");
  6. }
  7. if (jobId == null) {
  8. jobId = JobID.generate();
  9. }
  10. this.vertices = new HashMap<PlanNode, JobVertex>();
  11. this.chainedTasks = new HashMap<PlanNode, TaskInChain>();
  12. this.chainedTasksInSequence = new ArrayList<TaskInChain>();
  13. this.auxVertices = new ArrayList<JobVertex>();
  14. this.iterations = new HashMap<IterationPlanNode, IterationDescriptor>();
  15. this.iterationStack = new ArrayList<IterationPlanNode>();
  16. this.sharingGroup = new SlotSharingGroup();
  17. // this starts the traversal that generates the job graph
  18. //JobGraph创建的核心方法
  19. program.accept(this);
  20. ...
  21. // now that the traversal is done, we have the chained tasks write their configs into their
  22. // parents' configurations
  23. //将那些需要被chain的节点添加到JobVertex的配置中去
  24. for (TaskInChain tic : this.chainedTasksInSequence) {
  25. TaskConfig t = new TaskConfig(tic.getContainingVertex().getConfiguration());
  26. t.addChainedTask(tic.getChainedTask(), tic.getTaskConfig(), tic.getTaskName());
  27. }
  28. // ----- attach the additional info to the job vertices, for display in the runtime monitor
  29. attachOperatorNamesAndDescriptions();
  30. // ----------- finalize the job graph -----------
  31. // create the job graph object
  32. //创建JobGraph对象,将上述创建的顶点都添加到JobGraph中
  33. JobGraph graph = new JobGraph(jobId, program.getJobName());
  34. try {
  35. graph.setExecutionConfig(program.getOriginalPlan().getExecutionConfig());
  36. }
  37. catch (IOException e) {
  38. throw new CompilerException("Could not serialize the ExecutionConfig." +
  39. "This indicates that non-serializable types (like custom serializers) were registered");
  40. }
  41. graph.setAllowQueuedScheduling(false);
  42. graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout());
  43. // add vertices to the graph
  44. for (JobVertex vertex : this.vertices.values()) {
  45. vertex.setInputDependencyConstraint(program.getOriginalPlan().getExecutionConfig().getDefaultInputDependencyConstraint());
  46. graph.addVertex(vertex);
  47. }
  48. for (JobVertex vertex : this.auxVertices) {
  49. graph.addVertex(vertex);
  50. vertex.setSlotSharingGroup(sharingGroup);
  51. }
  52. Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts =
  53. program.getOriginalPlan().getCachedFiles().stream()
  54. .map(entry -> Tuple2.of(entry.getKey(), entry.getValue()))
  55. .collect(Collectors.toList());
  56. addUserArtifactEntries(userArtifacts, graph);
  57. // release all references again
  58. this.vertices = null;
  59. this.chainedTasks = null;
  60. this.chainedTasksInSequence = null;
  61. this.auxVertices = null;
  62. this.iterations = null;
  63. this.iterationStack = null;
  64. // return job graph
  65. return graph;
  66. }

accept()方法跟之前的accept()是一样的,都是先从sink开始,由下至上对每个PlanNode执行visitor.preVisit()方法,再由上至下对每个PlanNode执行visitor.postVisit()。这里的visitor就是JobGraphGenerator

  1. //OptimizedPlan类
  2. public void accept(Visitor<PlanNode> visitor) {
  3. for (SinkPlanNode node : this.dataSinks) {
  4. node.accept(visitor);
  5. }
  6. }
  7. //SinkPlanNode、SingleInputPlanNode类
  8. public void accept(Visitor<PlanNode> visitor) {
  9. if (visitor.preVisit(this)) {
  10. this.input.getSource().accept(visitor);
  11. for (Channel broadcastInput : getBroadcastInputs()) {
  12. broadcastInput.getSource().accept(visitor);
  13. }
  14. visitor.postVisit(this);
  15. }
  16. }
  17. //SourcePlanNode类
  18. public void accept(Visitor<PlanNode> visitor) {
  19. if (visitor.preVisit(this)) {
  20. visitor.postVisit(this);
  21. }
  22. }

9.创建JobVertex

那么我们先来看JobGraphGenerator.preVisit()方法。从方法中我们可以看到,preVisit()方法就是创建JobGraph顶点的过程,这里我们关注的主要是三种节点类型,SinkPlanNode、SourcePlanNode、SingleInputPlanNode

  1. public boolean preVisit(PlanNode node) {
  2. // check if we have visited this node before. in non-tree graphs, this happens
  3. if (this.vertices.containsKey(node) || this.chainedTasks.containsKey(node) || this.iterations.containsKey(node)) {
  4. // return false to prevent further descend
  5. return false;
  6. }
  7. // the vertex to be created for the current node
  8. final JobVertex vertex;
  9. try {
  10. if (node instanceof SinkPlanNode) {
  11. vertex = createDataSinkVertex((SinkPlanNode) node);
  12. }
  13. else if (node instanceof SourcePlanNode) {
  14. vertex = createDataSourceVertex((SourcePlanNode) node);
  15. }
  16. ...
  17. else if (node instanceof SingleInputPlanNode) {
  18. vertex = createSingleInputVertex((SingleInputPlanNode) node);
  19. }
  20. ...
  21. else {
  22. throw new CompilerException("Unrecognized node type: " + node.getClass().getName());
  23. }
  24. }
  25. catch (Exception e) {
  26. throw new CompilerException("Error translating node '" + node + "': " + e.getMessage(), e);
  27. }
  28. // check if a vertex was created, or if it was chained or skipped
  29. if (vertex != null) {
  30. // set parallelism
  31. int pd = node.getParallelism();
  32. vertex.setParallelism(pd);
  33. vertex.setMaxParallelism(pd);
  34. vertex.setSlotSharingGroup(sharingGroup);
  35. // check whether this vertex is part of an iteration step function
  36. ...
  37. // store in the map
  38. this.vertices.put(node, vertex);
  39. }
  40. // returning true causes deeper descend
  41. return true;
  42. }

10.创建sink节点的JobVertex:

OutputFormatVertex继承了JobVertex,作为sink节点的JobVertex,Task类型为DataSinkTask。那么这里我们可以分析到,sink是不与其他的节点进行chain链接的。而是单独作为一个顶点存在,在执行过程中,sink也将单独作为一组task来执行。这和流处理模式是有区别的。

  1. private JobVertex createDataSinkVertex(SinkPlanNode node) throws CompilerException {
  2. final OutputFormatVertex vertex = new OutputFormatVertex(node.getNodeName());
  3. final TaskConfig config = new TaskConfig(vertex.getConfiguration());
  4. vertex.setResources(node.getMinResources(), node.getPreferredResources());
  5. vertex.setInvokableClass(DataSinkTask.class);
  6. vertex.setFormatDescription(getDescriptionForUserCode(node.getProgramOperator().getUserCodeWrapper()));
  7. // set user code
  8. config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
  9. config.setStubParameters(node.getProgramOperator().getParameters());
  10. return vertex;
  11. }

11. 创建source节点的JobVertex:

InputFormatVertex同样继承了JobVertex,作为source节点的JobVertex,Task任务类型为DataSourceTask。

  1. private InputFormatVertex createDataSourceVertex(SourcePlanNode node) throws CompilerException {
  2. final InputFormatVertex vertex = new InputFormatVertex(node.getNodeName());
  3. final TaskConfig config = new TaskConfig(vertex.getConfiguration());
  4. vertex.setResources(node.getMinResources(), node.getPreferredResources());
  5. vertex.setInvokableClass(DataSourceTask.class);
  6. vertex.setFormatDescription(getDescriptionForUserCode(node.getProgramOperator().getUserCodeWrapper()));
  7. // set user code
  8. config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
  9. config.setStubParameters(node.getProgramOperator().getParameters());
  10. config.setOutputSerializer(node.getSerializer());
  11. return vertex;
  12. }

12 创建SingleInputPlanNode的JobVertex:

这也是大多数情况下的顶点创建过程。大致的过程就是首先判断当前节点能否和之前的节点进行链接chain。如果能chain,就先放到chainedTasks中,如果不能进行chain,就创建一个新节点JobVertex,没有迭代算子的情况下Task任务类型是BatchTask。能进行chain的条件大致如下:

1、节点的ChainDriverClass不能为空,ChainDriverClass描述了节点间进行chain的驱动类型

2、节点类型不能为NAryUnionPlanNode、BulkPartialSolutionPlanNode、WorksetPlanNode、IterationPlanNode

3、节点间的数据交换模式为FORWARD

4、本地策略为NONE

5、上游节点只有一个输出

6、上下游节点并行度一致

7、该节点没有广播数据输入

  1. private JobVertex createSingleInputVertex(SingleInputPlanNode node) throws CompilerException {
  2. final String taskName = node.getNodeName();
  3. final DriverStrategy ds = node.getDriverStrategy();
  4. // check, whether chaining is possible
  5. boolean chaining;
  6. {
  7. Channel inConn = node.getInput();
  8. PlanNode pred = inConn.getSource();
  9. chaining = ds.getPushChainDriverClass() != null &&
  10. !(pred instanceof NAryUnionPlanNode) && // first op after union is stand-alone, because union is merged
  11. !(pred instanceof BulkPartialSolutionPlanNode) && // partial solution merges anyways
  12. !(pred instanceof WorksetPlanNode) && // workset merges anyways
  13. !(pred instanceof IterationPlanNode) && // cannot chain with iteration heads currently
  14. inConn.getShipStrategy() == ShipStrategyType.FORWARD &&
  15. inConn.getLocalStrategy() == LocalStrategy.NONE &&
  16. pred.getOutgoingChannels().size() == 1 &&
  17. node.getParallelism() == pred.getParallelism() &&
  18. node.getBroadcastInputs().isEmpty();
  19. ...
  20. }
  21. final JobVertex vertex;
  22. final TaskConfig config;
  23. if (chaining) {
  24. vertex = null;
  25. config = new TaskConfig(new Configuration());
  26. this.chainedTasks.put(node, new TaskInChain(node, ds.getPushChainDriverClass(), config, taskName));
  27. } else {
  28. // create task vertex
  29. vertex = new JobVertex(taskName);
  30. vertex.setResources(node.getMinResources(), node.getPreferredResources());
  31. vertex.setInvokableClass((this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediateTask.class : BatchTask.class);
  32. config = new TaskConfig(vertex.getConfiguration());
  33. //Driver是节点处理数据的核心类
  34. config.setDriver(ds.getDriverClass());
  35. }
  36. // set user code
  37. config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
  38. config.setStubParameters(node.getProgramOperator().getParameters());
  39. // set the driver strategy
  40. config.setDriverStrategy(ds);
  41. for (int i = 0; i < ds.getNumRequiredComparators(); i++) {
  42. config.setDriverComparator(node.getComparator(i), i);
  43. }
  44. // assign memory, file-handles, etc.
  45. assignDriverResources(node, config);
  46. return vertex;
  47. }

通过PlanNode从下至上的调用JobGraphGenerator.preVisit()方法,所有的JobVertex现在都被创建出来了。

连接JobVertex

下面来看看JobGraphGenerator.postVisit(),这个方法的调用是从上至下(source到sink)调用的。大致实现如下:

1、如果是source节点,不做任何操作,直接返回

2、如果PlanNode是需要进行chain的节点,即chain在JobVertex头结点之后的节点。那么会给该节点设置它应该属于的那个JobVertex。

3、如果PlanNode是JobVertex的头节点,那么会将该节点对应的JobVertex与之前的JobVertex进行连接。这个过程会创建JobEdge,中间结果集IntermediateDataSet

  1. public void postVisit(PlanNode node) {
  2. try {
  3. //如果是source节点,直接返回,不做任何操作
  4. if (node instanceof SourcePlanNode || node instanceof NAryUnionPlanNode || node instanceof SolutionSetPlanNode) {
  5. return;
  6. }
  7. // check if we have an iteration. in that case, translate the step function now
  8. ...
  9. final JobVertex targetVertex = this.vertices.get(node);
  10. // check whether this node has its own task, or is merged with another one
  11. //targetVertex == null这种情况是针对那些需要chain的PlanNode节点
  12. if (targetVertex == null) {
  13. // node's task is merged with another task. it is either chained, of a merged head vertex
  14. // from an iteration
  15. final TaskInChain chainedTask;
  16. if ((chainedTask = this.chainedTasks.get(node)) != null) {
  17. // Chained Task. Sanity check first...
  18. final Iterator<Channel> inConns = node.getInputs().iterator();
  19. if (!inConns.hasNext()) {
  20. throw new CompilerException("Bug: Found chained task with no input.");
  21. }
  22. final Channel inConn = inConns.next();
  23. ...
  24. JobVertex container = chainedTask.getContainingVertex();
  25. if (container == null) {
  26. final PlanNode sourceNode = inConn.getSource();
  27. container = this.vertices.get(sourceNode);
  28. if (container == null) {
  29. // predecessor is itself chained
  30. container = this.chainedTasks.get(sourceNode).getContainingVertex();
  31. if (container == null) {
  32. throw new IllegalStateException("Bug: Chained task predecessor has not been assigned its containing vertex.");
  33. }
  34. } else {
  35. // predecessor is a proper task job vertex and this is the first chained task. add a forward connection entry.
  36. new TaskConfig(container.getConfiguration()).addOutputShipStrategy(ShipStrategyType.FORWARD);
  37. }
  38. //给这些节点设置他们应该属于的JobVertex
  39. chainedTask.setContainingVertex(container);
  40. }
  41. // add info about the input serializer type
  42. chainedTask.getTaskConfig().setInputSerializer(inConn.getSerializer(), 0);
  43. // update name of container task
  44. String containerTaskName = container.getName();
  45. if (containerTaskName.startsWith("CHAIN ")) {
  46. container.setName(containerTaskName + " -> " + chainedTask.getTaskName());
  47. } else {
  48. container.setName("CHAIN " + containerTaskName + " -> " + chainedTask.getTaskName());
  49. }
  50. //update resource of container task
  51. container.setResources(container.getMinResources().merge(node.getMinResources()),
  52. container.getPreferredResources().merge(node.getPreferredResources()));
  53. this.chainedTasksInSequence.add(chainedTask);
  54. return;
  55. }
  56. else if (node instanceof BulkPartialSolutionPlanNode ||
  57. node instanceof WorksetPlanNode)
  58. {
  59. // merged iteration head task. the task that the head is merged with will take care of it
  60. return;
  61. } else {
  62. throw new CompilerException("Bug: Unrecognized merged task vertex.");
  63. }
  64. }
  65. ...
  66. //下面的代码是针对有JobVertex的PlanNode节点,也即JobVertex中的头节点
  67. // create the config that will contain all the description of the inputs
  68. final TaskConfig targetVertexConfig = new TaskConfig(targetVertex.getConfiguration());
  69. // get the inputs. if this node is the head of an iteration, we obtain the inputs from the
  70. // enclosing iteration node, because the inputs are the initial inputs to the iteration.
  71. final Iterator<Channel> inConns;
  72. if (node instanceof BulkPartialSolutionPlanNode) {
  73. ...
  74. } else if (node instanceof WorksetPlanNode) {
  75. ...
  76. } else {
  77. inConns = node.getInputs().iterator();
  78. }
  79. ...
  80. int inputIndex = 0;
  81. while (inConns.hasNext()) {
  82. Channel input = inConns.next();
  83. //translateChannel会连接两个JobVertex,创建JobEdge和中间结果集IntermediateDataSet
  84. inputIndex += translateChannel(input, inputIndex, targetVertex, targetVertexConfig, false);
  85. }
  86. // broadcast variables
  87. ...
  88. } catch (Exception e) {
  89. throw new CompilerException(
  90. "An error occurred while translating the optimized plan to a JobGraph: " + e.getMessage(), e);
  91. }
  92. }

我们主要看两个顶点之间的连接过程,在translateChannel()方法。调用方法链为:

JobGraphGenerator.translateChannel() --> JobGraphGenerator.connectJobVertices() --> JobVertex.connectNewDataSetAsInput()。经过这个过程,JobVertex进行了连接,JobEdge和中间结果集IntermediateDataSet都创建出来了。这时JobGraph基本已经构建完毕了

  1. //JobGraphGenerator类
  2. private int translateChannel(Channel input, int inputIndex, JobVertex targetVertex,
  3. TaskConfig targetVertexConfig, boolean isBroadcast) throws Exception
  4. {
  5. final PlanNode inputPlanNode = input.getSource();
  6. final Iterator<Channel> allInChannels;
  7. if (inputPlanNode instanceof NAryUnionPlanNode) {
  8. ...
  9. } else {
  10. allInChannels = Collections.singletonList(input).iterator();
  11. }
  12. ...
  13. // expand the channel to all the union channels, in case there is a union operator at its source
  14. while (allInChannels.hasNext()) {
  15. final Channel inConn = allInChannels.next();
  16. ...
  17. final PlanNode sourceNode = inConn.getSource();
  18. JobVertex sourceVertex = this.vertices.get(sourceNode);
  19. TaskConfig sourceVertexConfig;
  20. if (sourceVertex == null) {
  21. // this predecessor is chained to another task or an iteration
  22. //这种情况下sourceNode是一个被chain的节点,不是JobVertex的头节点。这时候获取它属于的那个JobVertex
  23. final TaskInChain chainedTask;
  24. final IterationDescriptor iteration;
  25. if ((chainedTask = this.chainedTasks.get(sourceNode)) != null) {
  26. // push chained task
  27. if (chainedTask.getContainingVertex() == null) {
  28. throw new IllegalStateException("Bug: Chained task has not been assigned its containing vertex when connecting.");
  29. }
  30. sourceVertex = chainedTask.getContainingVertex();
  31. sourceVertexConfig = chainedTask.getTaskConfig();
  32. } else if ((iteration = this.iterations.get(sourceNode)) != null) {
  33. // predecessor is an iteration
  34. sourceVertex = iteration.getHeadTask();
  35. sourceVertexConfig = iteration.getHeadFinalResultConfig();
  36. } else {
  37. throw new CompilerException("Bug: Could not resolve source node for a channel.");
  38. }
  39. } else {
  40. // predecessor is its own vertex
  41. sourceVertexConfig = new TaskConfig(sourceVertex.getConfiguration());
  42. }
  43. //连接两个顶点JobVertex
  44. DistributionPattern pattern = connectJobVertices(
  45. inConn, inputIndex, sourceVertex, sourceVertexConfig, targetVertex, targetVertexConfig, isBroadcast);
  46. ...
  47. // the local strategy is added only once. in non-union case that is the actual edge,
  48. // in the union case, it is the edge between union and the target node
  49. addLocalInfoFromChannelToConfig(input, targetVertexConfig, inputIndex, isBroadcast);
  50. return 1;
  51. }
  52. //JobGraphGenerator类
  53. private DistributionPattern connectJobVertices(Channel channel, int inputNumber,
  54. final JobVertex sourceVertex, final TaskConfig sourceConfig,
  55. final JobVertex targetVertex, final TaskConfig targetConfig, boolean isBroadcast)
  56. throws CompilerException
  57. {
  58. // ------------ connect the vertices to the job graph --------------
  59. final DistributionPattern distributionPattern;
  60. switch (channel.getShipStrategy()) {
  61. case FORWARD:
  62. distributionPattern = DistributionPattern.POINTWISE;
  63. break;
  64. case PARTITION_RANDOM:
  65. case BROADCAST:
  66. case PARTITION_HASH:
  67. case PARTITION_CUSTOM:
  68. case PARTITION_RANGE:
  69. case PARTITION_FORCED_REBALANCE:
  70. distributionPattern = DistributionPattern.ALL_TO_ALL;
  71. break;
  72. default:
  73. throw new RuntimeException("Unknown runtime ship strategy: " + channel.getShipStrategy());
  74. }
  75. //resultType影响ResultPartition的类型,分为PIPELINED和BLOCKING,BLOCKING会将数据spill到磁盘
  76. final ResultPartitionType resultType;
  77. switch (channel.getDataExchangeMode()) {
  78. case PIPELINED:
  79. resultType = ResultPartitionType.PIPELINED;
  80. break;
  81. case BATCH:
  82. // BLOCKING results are currently not supported in closed loop iterations
  83. //
  84. // See https://issues.apache.org/jira/browse/FLINK-1713 for details
  85. resultType = channel.getSource().isOnDynamicPath()
  86. ? ResultPartitionType.PIPELINED
  87. : ResultPartitionType.BLOCKING;
  88. break;
  89. case PIPELINE_WITH_BATCH_FALLBACK:
  90. throw new UnsupportedOperationException("Data exchange mode " +
  91. channel.getDataExchangeMode() + " currently not supported.");
  92. default:
  93. throw new UnsupportedOperationException("Unknown data exchange mode.");
  94. }
  95. //在这里创建JobEdge和IntermediateDataSet
  96. JobEdge edge = targetVertex.connectNewDataSetAsInput(sourceVertex, distributionPattern, resultType);
  97. // -------------- configure the source task's ship strategy strategies in task config --------------
  98. ...
  99. return distributionPattern;
  100. }
  101. //JobVertex类
  102. public JobEdge connectNewDataSetAsInput(
  103. JobVertex input,
  104. DistributionPattern distPattern,
  105. ResultPartitionType partitionType) {
  106. IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
  107. JobEdge edge = new JobEdge(dataSet, this, distPattern);
  108. this.inputs.add(edge);
  109. dataSet.addConsumer(edge);
  110. return edge;
  111. }

对所有的PlanNode执行完JobGraphGenerator.postVisit()之后,JobEdge和中间结果集IntermediateDataSet都创建出来了,这时JobGraph基本已经构建完毕了。这时program.accept()方法也执行完毕了。

再回到上述的JobGraphGenerator.compileJobGraph()方法,program.accept()方法也执行完毕后,会将那些需要chain起来的节点信息添加到他们对应的JobVertex配置中。随后创建JobGraph实例,将program.accept()方法创建的所有的JobVertex添加到JobGraph。到此,JobGraph就创建完毕了。

以WordCount为例,JobGraph创建完的拓扑如下:

  1. InputFormatVertexCHAIN DataSource -> FlatMap -> Map -> Combine (SUM(1))
  2. ——> IntermediateDataSet ——> JobEdge ——>
  3. JobVertexReduce (SUM(1)) )——> IntermediateDataSet ——> JobEdge ——> InputFormatVertexDataSink (TextOutputFormat

13.总结:

本文以WordCount为例,JobGraph创建的总体步骤如下:

1、在创建完整个的执行程序时,会创建很多DataSet,比如map、filter、reduce等算子都会创建一个新的DataSet。上一个DataSet作为下个DataSet的input,进行了连接。WordCount程序初始状态如下:DataSource ——> FlatMapOperator ——> MapOperator ——> ScalaAggregateOperator ——> DataSink。注意这里的Operator并非是指算子层面的operator,而是DataSet,这些operator也还是DataSet的子类型(DataSink除外)

2、创建执行计划Plan。将上述1中的DataSet进行转换,转换成算子层面的Operator。大致实现就是从sink开始进行向上深度优化遍历递归的转换,先转换source,再依次向下进行转换,转换的方法就是调用每个DataSet(或者DataSink)的translateToDataFlow()方法,将DataSet转换成算子层面的Operator,然后将上一级转换后的Operator当做下个Operator的input输入。WordCount转换成的算子层面的Operator就如下所示:GenericDataSourceBase --> FlatMapOperatorBase --> MapOperatorBase --> GroupReduceOperatorBase --> GenericDataSinkBase

3、使用优化器对Plan进行优化,编译成OptimizedPlan。首先会使用GraphCreatingVisitor对原始的Plan进行优化,将每个operator优化成OptimizerNode,OptimizerNode之间通过DagConnection相连,DagConnection相当于一个边模型,用来连接两个节点。OptimizerNode的创建过程是通过Plan.accept()方法。先从sink(GenericDataSinkBase)开始,由下至上对每个operator执行visitor.preVisit()方法,用于创建OptimizerNode;再由上至下对每个operator执行visitor.postVisit(),用于连接两个OptimizerNode。WordCount整个计划变成了如下的拓扑结构:

DataSourceNode --> FlatMapNode --> MapNode --> GroupReduceNode --> DataSinkNode

每个OptimizerNode之间通过DagConnection进行连接

4、将OptimizerNode进一步编译成PlanNode,封装成OptimizedPlan。代码在OptimizerNode.getAlternativePlans(),又是一个递归的遍历,是后续递归遍历,从上(source)到下(sink)的去创建PlanNode和Channel。PlanNode之间通过Channel相连,Channel也相当于是一个边模型,连接两个节点。这个过程会做很多优化,比如对GroupReduceNode会增加combiner的节点,对Channel会设置ShipStrategyType和DataExchangeMode,ShipStrategyType表示的两个节点之间数据的传输策略,比如进行hash分区,范围分区等,DataExchangeMode表示的是两个节点间数据交换的模式,有PIPELINED和BATCH。PIPELINED模式数据像流水线一样的进行传输,上游任务和下游任务能够同时进行生产和消费数据;BATCH模式需要等上游的任务数据全部处理完之后才会开始下游的任务,中间数据会spill到磁盘上。此时的WordCount拓扑结果图如下:

SourcePlanNode --> SingleInputPlanNode(FlatMap) --> SingleInputPlanNode(Map) --> SingleInputPlanNode(GroupCombine) --> SingleInputPlanNode(GroupReduce) --> SinkPlanNode

各个PlanNode通过Channel进行链接

5、创建JobGraph。在4中创建完所有的OptimizedPlan之后,使用JobGraphGenerator编译成JobGraph。核心代码在OptimizedPlan.accept(jobGraphGenerator)。主要的实现和步骤3类似,先从上至下(从SinkPlanNode至SourcePlanNode)执行JobGraphGenerator.preVisit()方法,在从上至下(从SourcePlanNode至SinkPlanNode)执行JobGraphGenerator.postVisit()。preVisit()方法用来创建JobVertex,保存那些需要被chain在一起的节点。postVisit()方法用于连接两个JobVertex,创建JobEdge和中间结果集IntermediateDataSet,把那些需要被chain在一起的节点设置他们属于的JobVertex。

6、postVisit()方法执行完毕之后所有的JobVertex都创建出来了,JobEdge和IntermediateDataSet也都被创建出来了。接下来就构建一个JobGraph实例,将JobVertex都添加进去,将那些需要被chain在一起的节点都添加到JobVertex的配置中,整个JobGraph就构建完成了。以WordCount为例,JobGraph创建完的拓扑如下:

InputFormatVertex(CHAIN DataSource -> FlatMap -> Map -> Combine (SUM(1))

——> IntermediateDataSet ——> JobEdge ——>

JobVertex(Reduce (SUM(1)) )——> IntermediateDataSet ——> JobEdge ——> InputFormatVertex(DataSink (TextOutputFormat)

相关文章