【FLink】Flink SQL代码生成与UDF重复调用的优化

x33g5p2x  于2022-03-28 转载在 Flink  
字(15.4k)|赞(0)|评价(0)|浏览(728)

1.概述

转载:Flink SQL代码生成与UDF重复调用的优化

2. 代码生成简介

代码生成(code generation)是当今各种数据库和数据处理引擎广泛采用的物理执行层技术之一。通过代码生成,可以将原本需要解释执行的算子逻辑转为编译执行(二进制代码),充分利用JIT编译的优势,克服传统Volcano模型虚函数调用过多、对寄存器不友好的缺点,在CPU-bound场景下可以获得大幅的性能提升。

在大数据领域,看官最为熟知的代码生成应用可能就是Spark 2.x的全阶段代码生成(whole-stage code generation)机制,它也是笔者两年前介绍过的Tungsten Project的一部分。以常见的FILTER -> JOIN -> AGGREGATE流程为例,全阶段代码生成只需2个Stage,而传统Volcano模型则需要9次虚函数调用,如下图所示。

关于Spark的代码生成,可以参考其源码或DataBricks的说明文章,不再赘述。而Flink作为后起之秀,在Flink SQL (Blink Planner)中也采用了类似的思路。本文就来做个quick tour,并提出一个小而有用的优化。

3.1 CodeGeneratorContext

顾名思义,CodeGeneratorContext就是代码生成器的上下文,且同一个CodeGeneratorContext实例在相互有关联的代码生成器之间可以共享。它的作用就是维护代码生成过程中的各种能够重复使用的逻辑,包括且不限于:

  1. 对象引用
  2. 构造代码、初始化代码
  3. 常量、成员变量、局部变量、时间变量
  4. 函数体(即Flink Function)及其配套(open()/close()等等)
  5. 类型序列化器
  6. etc.

具体代码暂时不贴,以下是该类的部分结构。

3.2 CodeGenerator

Blink Planner的代码生成器并没有统一的基类。它们的共同点就是类名大多以CodeGenerator为后缀,并且绝大多数都要与CodeGeneratorContext打交道。它们的类名也都比较self-explanatory,如下图所示。注意笔者使用的是Flink 1.13版本,所以其中还混杂着少量Old Planner的内容,可以无视之。

挑选几个在流计算场景下比较重点的,稍微解释一下。

AggsHandlerCodeGenerator——负责生成普通聚合函数

AggsHandleFunction与带命名空间(即窗口语义)的聚合函数NamespaceAggsHandleFunction。注意它们与DataStream API中的聚合函数AggregateFunction不是一回事,但大致遵循同样的规范。

CollectorCodeGenerator——负责生成Collector,即算子内将流数据向下游发射的组件。看官用过DataStream API的话会很熟悉。

ExprCodeGenerator——负责根据Calcite RexNode生成各类表达式,Planner内部用GeneratedExpression来表示。由于RexNode很多变(字面量、变量、函数调用等等),它巧妙地利用了RexVisitor通过访问者模式来将不同类型的RexNode翻译成对应的代码。

FunctionCodeGenerator——负责根据SQL逻辑生成各类函数,目前支持的有RichMapFunction、RichFlatMapFunction、RichFlatJoinFunction、RichAsyncFunction和ProcessFunction。
OperatorCodeGenerator——负责生成OneInputStreamOperator和TwoInputStreamOperator。

代码生成器一般会在物理执行节点(即ExecNode)内被调用,但不是所有的Flink SQL逻辑都会直接走代码生成,例如不久前讲过的Window TVF的切片化窗口以及内置的Top-N。

3.3 GeneratedClass

GeneratedClass用来描述代码生成器生成的各类实体,如函数、算子等,它们都位于Runtime层,类图如下。

注意这其中并不包括GeneratedExpression,因为表达式的概念仅在Planner层存在。

4.代码生成示例

笔者仅用一条极简的SQL语句

  1. SELECT COUNT(orderId) FROM rtdw_dwd.kafka_order_done_log
  2. WHERE mainSiteId = 10029

来简单走一下流程。

观察该语句生成的物理执行计划:

  1. == Optimized Execution Plan ==
  2. GroupAggregate(select=[COUNT(orderId) AS EXPR$0])
  3. +- Exchange(distribution=[single])
  4. +- Calc(select=[orderId], where=[(mainSiteId = 10029:BIGINT)])
  5. +- TableSourceScan(table=[[hive, rtdw_dwd, kafka_order_done_log]], fields=[ts, tss, tssDay, orderId, /* ... */])

在这四个ExecNode中,StreamExecCalc和StreamExecGroupAggregate会涉及代码生成。篇幅所限,本文只分析StreamExecCalc,它的主要代码由CalcCodeGenerator#generateProcessCode()方法生成,该方法全文如下。

  1. private[flink] def generateProcessCode(
  2. ctx: CodeGeneratorContext,
  3. inputType: RowType,
  4. outRowType: RowType,
  5. outRowClass: Class[_ <: RowData],
  6. projection: Seq[RexNode],
  7. condition: Option[RexNode],
  8. inputTerm: String = CodeGenUtils.DEFAULT_INPUT1_TERM,
  9. collectorTerm: String = CodeGenUtils.DEFAULT_OPERATOR_COLLECTOR_TERM,
  10. eagerInputUnboxingCode: Boolean,
  11. retainHeader: Boolean = false,
  12. outputDirectly: Boolean = false,
  13. allowSplit: Boolean = false): String = {
  14. // according to the SQL standard, every table function should also be a scalar function
  15. // but we don't allow that for now
  16. projection.foreach(_.accept(ScalarFunctionsValidator))
  17. condition.foreach(_.accept(ScalarFunctionsValidator))
  18. val exprGenerator = new ExprCodeGenerator(ctx, false)
  19. .bindInput(inputType, inputTerm = inputTerm)
  20. val onlyFilter = projection.lengthCompare(inputType.getFieldCount) == 0 &&
  21. projection.zipWithIndex.forall { case (rexNode, index) =>
  22. rexNode.isInstanceOf[RexInputRef] && rexNode.asInstanceOf[RexInputRef].getIndex == index
  23. }
  24. def produceOutputCode(resultTerm: String): String = if (outputDirectly) {
  25. s"$collectorTerm.collect($resultTerm);"
  26. } else {
  27. s"${OperatorCodeGenerator.generateCollect(resultTerm)}"
  28. }
  29. def produceProjectionCode: String = {
  30. val projectionExprs = projection.map(exprGenerator.generateExpression)
  31. val projectionExpression = exprGenerator.generateResultExpression(
  32. projectionExprs,
  33. outRowType,
  34. outRowClass,
  35. allowSplit = allowSplit)
  36. val projectionExpressionCode = projectionExpression.code
  37. val header = if (retainHeader) {
  38. s"${projectionExpression.resultTerm}.setRowKind($inputTerm.getRowKind());"
  39. } else {
  40. ""
  41. }
  42. s"""
  43. |$header
  44. |$projectionExpressionCode
  45. |${produceOutputCode(projectionExpression.resultTerm)}
  46. |""".stripMargin
  47. }
  48. if (condition.isEmpty && onlyFilter) {
  49. throw new TableException("This calc has no useful projection and no filter. " +
  50. "It should be removed by CalcRemoveRule.")
  51. } else if (condition.isEmpty) { // only projection
  52. val projectionCode = produceProjectionCode
  53. s"""
  54. |${if (eagerInputUnboxingCode) ctx.reuseInputUnboxingCode() else ""}
  55. |$projectionCode
  56. |""".stripMargin
  57. } else {
  58. val filterCondition = exprGenerator.generateExpression(condition.get)
  59. // only filter
  60. if (onlyFilter) {
  61. s"""
  62. |${if (eagerInputUnboxingCode) ctx.reuseInputUnboxingCode() else ""}
  63. |${filterCondition.code}
  64. |if (${filterCondition.resultTerm}) {
  65. | ${produceOutputCode(inputTerm)}
  66. |}
  67. |""".stripMargin
  68. } else { // both filter and projection
  69. val filterInputCode = ctx.reuseInputUnboxingCode()
  70. val filterInputSet = Set(ctx.reusableInputUnboxingExprs.keySet.toSeq: _*)
  71. // if any filter conditions, projection code will enter an new scope
  72. val projectionCode = produceProjectionCode
  73. val projectionInputCode = ctx.reusableInputUnboxingExprs
  74. .filter(entry => !filterInputSet.contains(entry._1))
  75. .values.map(_.code).mkString("\n")
  76. s"""
  77. |${if (eagerInputUnboxingCode) filterInputCode else ""}
  78. |${filterCondition.code}
  79. |if (${filterCondition.resultTerm}) {
  80. | ${if (eagerInputUnboxingCode) projectionInputCode else ""}
  81. | $projectionCode
  82. |}
  83. |""".stripMargin
  84. }
  85. }
  86. }

从中可以看出明显的模拟拼接手写代码的过程。之前讲过,Calc就是Project和Filter的结合,该方法的入参中恰好包含了对应的RexNode:

  • projection——类型为RexInputRef,值为$3,即源表中index为3的列orderId。
  • condition——类型为RexCall,值为=($32, 10029),即mainSiteId = 10029的谓词。

接下来调用ExprCodeGenerator.generateExpression()方法,先生成condition对应的GeneratedExpression。借助访问者模式,会转到ExprCodeGenerator#visitCall()方法,最终生成带空值判断的完整代码。部分调用栈如下:

  1. generateCallWithStmtIfArgsNotNull:98, GenerateUtils$ (org.apache.flink.table.planner.codegen)
  2. generateCallIfArgsNotNull:67, GenerateUtils$ (org.apache.flink.table.planner.codegen)
  3. generateOperatorIfNotNull:2323, ScalarOperatorGens$ (org.apache.flink.table.planner.codegen.calls)
  4. generateComparison:577, ScalarOperatorGens$ (org.apache.flink.table.planner.codegen.calls)
  5. generateEquals:429, ScalarOperatorGens$ (org.apache.flink.table.planner.codegen.calls)
  6. generateCallExpression:630, ExprCodeGenerator (org.apache.flink.table.planner.codegen)
  7. visitCall:529, ExprCodeGenerator (org.apache.flink.table.planner.codegen)
  8. visitCall:56, ExprCodeGenerator (org.apache.flink.table.planner.codegen)
  9. accept:174, RexCall (org.apache.calcite.rex)
  10. generateExpression:155, ExprCodeGenerator (org.apache.flink.table.planner.codegen)
  11. generateProcessCode:173, CalcCodeGenerator$ (org.apache.flink.table.planner.codegen)
  12. generateCalcOperator:50, CalcCodeGenerator$ (org.apache.flink.table.planner.codegen)
  13. generateCalcOperator:-1, CalcCodeGenerator (org.apache.flink.table.planner.codegen)
  14. translateToPlanInternal:94, CommonExecCalc (org.apache.flink.table.planner.plan.nodes.exec.common)

结果如下。其中resultTerm是表达式结果字段,nullTerm是表达式是否为空的boolean字段。后面的编号是内置计数器的值,防止重复。

  1. GeneratedExpression(resultTerm = result$3, nullTerm = isNull$2, code =
  2. isNull$2 = isNull$1 || false;
  3. result$3 = false;
  4. if (!isNull$2) {
  5. result$3 = field$1 == ((long) 10029L);
  6. }
  7. , resultType = BOOLEAN, literalValue = None)

看官可能会觉得生成的代码比较冗长,有些东西没必要写。但是代码生成器的设计目标是兼顾通用性和稳定性,因此必须保证生成的代码在各种情况下都可以正确地运行。另外JVM也可以通过条件编译、公共子表达式消除、方法内联等优化手段生成最优的字节码,不用过于担心。

话说回来,上文中过滤条件的输入filterInputCode是如何通过CodeGeneratorContext#reuseInputUnboxingCode()重用的呢?别忘了$32也是一个RexInputRef,所以递归visit到它时会调用GenerateUtils#generateInputAccess()方法生成对应的代码,即:

  1. isNull$1 = in1.isNullAt(32);
  2. field$1 = -1L;
  3. if (!isNull$1) {
  4. field$1 = in1.getLong(32);
  5. }

将它拼在filterCondition的前面,完成。处理projection的流程类似,看官可套用上面的思路自行追踪,不再废话了。

主处理逻辑生成之后,还需要将它用Function或者Operator承载才能生效。Calc节点在执行层对应的是一个OneInputStreamOperator,由OperatorCodeGenerator#generateOneInputStreamOperator()负责。从它的代码可以看到更清晰的轮廓,如下。

  1. def generateOneInputStreamOperator[IN <: Any, OUT <: Any](
  2. ctx: CodeGeneratorContext,
  3. name: String,
  4. processCode: String,
  5. inputType: LogicalType,
  6. inputTerm: String = CodeGenUtils.DEFAULT_INPUT1_TERM,
  7. endInputCode: Option[String] = None,
  8. lazyInputUnboxingCode: Boolean = false,
  9. converter: String => String = a => a): GeneratedOperator[OneInputStreamOperator[IN, OUT]] = {
  10. addReuseOutElement(ctx)
  11. val operatorName = newName(name)
  12. val abstractBaseClass = ctx.getOperatorBaseClass
  13. val baseClass = classOf[OneInputStreamOperator[IN, OUT]]
  14. val inputTypeTerm = boxedTypeTermForType(inputType)
  15. val (endInput, endInputImpl) = endInputCode match {
  16. case None => ("", "")
  17. case Some(code) =>
  18. (s"""
  19. |@Override
  20. |public void endInput() throws Exception {
  21. | ${ctx.reuseLocalVariableCode()}
  22. | $code
  23. |}
  24. """.stripMargin, s", ${className[BoundedOneInput]}")
  25. }
  26. val operatorCode =
  27. j"""
  28. public class $operatorName extends ${abstractBaseClass.getCanonicalName}
  29. implements ${baseClass.getCanonicalName}$endInputImpl {
  30. private final Object[] references;
  31. ${ctx.reuseMemberCode()}
  32. public $operatorName(
  33. Object[] references,
  34. ${className[StreamTask[_, _]]} task,
  35. ${className[StreamConfig]} config,
  36. ${className[Output[_]]} output,
  37. ${className[ProcessingTimeService]} processingTimeService) throws Exception {
  38. this.references = references;
  39. ${ctx.reuseInitCode()}
  40. this.setup(task, config, output);
  41. if (this instanceof ${className[AbstractStreamOperator[_]]}) {
  42. ((${className[AbstractStreamOperator[_]]}) this)
  43. .setProcessingTimeService(processingTimeService);
  44. }
  45. }
  46. @Override
  47. public void open() throws Exception {
  48. super.open();
  49. ${ctx.reuseOpenCode()}
  50. }
  51. @Override
  52. public void processElement($STREAM_RECORD $ELEMENT) throws Exception {
  53. $inputTypeTerm $inputTerm = ($inputTypeTerm) ${converter(s"$ELEMENT.getValue()")};
  54. ${ctx.reusePerRecordCode()}
  55. ${ctx.reuseLocalVariableCode()}
  56. ${if (lazyInputUnboxingCode) "" else ctx.reuseInputUnboxingCode()}
  57. $processCode
  58. }
  59. $endInput
  60. @Override
  61. public void close() throws Exception {
  62. super.close();
  63. ${ctx.reuseCloseCode()}
  64. }
  65. ${ctx.reuseInnerClassDefinitionCode()}
  66. }
  67. """.stripMargin
  68. LOG.debug(s"Compiling OneInputStreamOperator Code:\n$name")
  69. new GeneratedOperator(operatorName, operatorCode, ctx.references.toArray)
  70. }

仍然注意那些能够通过CodeGeneratorContext复用的内容,例如processElement()方法中的本地变量声明部分,可以通过reuseLocalVariableCode()取得。最终的生成结果比较冗长,看官可通过Pastebin的传送门查看,并与上面的框架对应。

  1. https://pastebin.com/sYKKGr5Q

另外,如果不想每次都通过Debug查看生成的代码,可在Log4j配置文件内加入以下两行。

  1. logger.codegen.name = org.apache.flink.table.runtime.generated
  2. logger.codegen.level = DEBUG

这样,在生成代码被编译的时候,就会输出其内容。当GeneratedClass被首次实例化时,就会调用Janino进行动态编译,并将结果缓存在一个内部Cache中,避免重复编译。可通过查看o.a.f.table.runtime.generated.CompileUtils及其上下文获得更多信息。

UDF重复调用的问题在某些情况下可能会对Flink SQL用户造成困扰,例如下面的SQL语句:

  1. SELECT
  2. mp['eventType'] AS eventType,
  3. mp['fromType'] AS fromType,
  4. mp['columnType'] AS columnType
  5. -- A LOT OF other columns...
  6. FROM (
  7. SELECT SplitQueryParamsAsMap(query_string) AS mp
  8. FROM rtdw_ods.kafka_analytics_access_log_app
  9. WHERE CHAR_LENGTH(query_string) > 1
  10. );

假设从Map中取N个key对应的value,自定义函数SplitQueryParamsAsMap就会被调用N次,这显然是不符合常理的——对于一个确定的输入query_string,该UDF的输出就是确定的,没有必要每次都调用。如果UDF包含计算密集型的逻辑,整个作业的性能就会受到很大影响。

如何解决呢?通过挖掘代码,可以得知源头在于Calcite重写查询时不会考虑函数的确定性(determinism),也就是说FunctionDefinition#isDeterministic()没有起到应有的作用。考虑到直接改动Calcite难度较大且容易引起兼容性问题,我们考虑在SQL执行前的最后一步——也就是代码生成阶段来施工。

观察调用UDF生成的代码,如下。

  1. externalResult$8 = (java.util.Map) function_com$sht$bigdata$rt$udf$scalar$SplitQueryParamsAsMap$5cccfdc891a58463898db753288ed577
  2. .eval(isNull$0 ? null : ((java.lang.String) converter$7.toExternal((org.apache.flink.table.data.binary.BinaryStringData) field$2)));
  3. isNull$10 = externalResult$8 == null;
  4. result$10 = null;
  5. if (!isNull$10) {
  6. result$10 = (org.apache.flink.table.data.MapData) converter$9.toInternalOrNull((java.util.Map) externalResult$8);
  7. }
  8. // ......
  9. externalResult$24 = (java.util.Map) function_com$sht$bigdata$rt$udf$scalar$SplitQueryParamsAsMap$5cccfdc891a58463898db753288ed577
  10. .eval(isNull$0 ? null : ((java.lang.String) converter$7.toExternal((org.apache.flink.table.data.binary.BinaryStringData) field$2)));
  11. isNull$25 = externalResult$24 == null;
  12. result$25 = null;
  13. if (!isNull$25) {
  14. result$25 = (org.apache.flink.table.data.MapData) converter$9.toInternalOrNull((java.util.Map) externalResult$24);
  15. }

因此,我们可以在UDF满足确定性的前提下,重用UDF表达式产生的结果,即形如externalResult$8的term。思路比较直接,首先在CodeGeneratorContext中添加可重用的UDF表达式及其result term的容器,以及对应的方法。代码如下。

  1. private val reusableScalarFuncExprs: mutable.Map[String, String] =
  2. mutable.Map[String, String]()
  3. private val reusableResultTerms: mutable.Map[String, String] =
  4. mutable.Map[String, String]()
  5. def addReusableScalarFuncExpr(code: String, term: String): Unit = {
  6. if (!reusableScalarFuncExprs.contains(code)) {
  7. reusableScalarFuncExprs.put(code, term)
  8. }
  9. }
  10. def addReusableResultTerm(term: String, originalTerm: String): Unit = {
  11. if (!reusableResultTerms.contains(term)) {
  12. reusableResultTerms.put(term, originalTerm);
  13. }
  14. }
  15. def reuseScalarFuncExpr(code: String) : String = {
  16. reusableScalarFuncExprs.getOrElse(code, code)
  17. }
  18. def reuseResultTerm(term: String) : String = {
  19. reusableResultTerms.getOrElse(term, term)
  20. }

注意在保存UDF表达式时,是以生成的代码为key,result term为value。保存result term的映射时,是以新的为key,旧的为value。

然后从ExprCodeGenerator入手(函数调用都属于RexCall),找到UDF代码生成的方法,即BridgingFunctionGenUtil#generateScalarFunctionCall(),做如下改动。

  1. private def generateScalarFunctionCall(
  2. ctx: CodeGeneratorContext,
  3. functionTerm: String,
  4. externalOperands: Seq[GeneratedExpression],
  5. outputDataType: DataType,
  6. isDeterministic: Boolean)
  7. : GeneratedExpression = {
  8. // result conversion
  9. val externalResultClass = outputDataType.getConversionClass
  10. val externalResultTypeTerm = typeTerm(externalResultClass)
  11. // Janino does not fully support the JVM spec:
  12. // boolean b = (boolean) f(); where f returns Object
  13. // This is not supported and we need to box manually.
  14. val externalResultClassBoxed = primitiveToWrapper(externalResultClass)
  15. val externalResultCasting = if (externalResultClass == externalResultClassBoxed) {
  16. s"($externalResultTypeTerm)"
  17. } else {
  18. s"($externalResultTypeTerm) (${typeTerm(externalResultClassBoxed)})"
  19. }
  20. val externalResultTerm = ctx.addReusableLocalVariable(externalResultTypeTerm, "externalResult")
  21. if (isDeterministic) {
  22. val funcEvalCode =
  23. s"""
  24. |$externalResultCasting $functionTerm
  25. | .$SCALAR_EVAL(${externalOperands.map(_.resultTerm).map(ctx.reuseResultTerm).mkString(", ")});
  26. |""".stripMargin
  27. val reusableFuncExpr = ctx.reuseScalarFuncExpr(funcEvalCode)
  28. if (!reusableFuncExpr.equals(funcEvalCode)) {
  29. ctx.addReusableResultTerm(externalResultTerm, reusableFuncExpr)
  30. }
  31. ctx.addReusableScalarFuncExpr(funcEvalCode, externalResultTerm)
  32. val internalExpr = genToInternalConverterAll(ctx, outputDataType, externalResultTerm)
  33. // function call
  34. internalExpr.copy(code =
  35. s"""
  36. |${externalOperands.map(_.code).mkString("\n")}
  37. |$externalResultTerm = $reusableFuncExpr;
  38. |${internalExpr.code}
  39. |""".stripMargin)
  40. } else {
  41. val internalExpr = genToInternalConverterAll(ctx, outputDataType, externalResultTerm)
  42. // function call
  43. internalExpr.copy(code =
  44. s"""
  45. |${externalOperands.map(_.code).mkString("\n")}
  46. |$externalResultTerm = $externalResultCasting $functionTerm
  47. | .$SCALAR_EVAL(${externalOperands.map(_.resultTerm).mkString(", ")});
  48. |${internalExpr.code}
  49. |""".stripMargin)
  50. }
  51. }

if (isDeterministic)块内的代码实现了UDF表达式重用,即重用生成的第一个result term。笔者就不多解释了,毕竟与上一节的相比已经算是很好理解了(笑

重新编译flink-table模块并执行相同的SQL,就会发现生成的代码发生了变化:

  1. externalResult$8 = (java.util.Map) function_com$sht$bigdata$rt$udf$scalar$SplitQueryParamsAsMap$5cccfdc891a58463898db753288ed577
  2. .eval(isNull$0 ? null : ((java.lang.String) converter$7.toExternal((org.apache.flink.table.data.binary.BinaryStringData) field$2)));
  3. isNull$10 = externalResult$8 == null;
  4. result$10 = null;
  5. if (!isNull$10) {
  6. result$10 = (org.apache.flink.table.data.MapData) converter$9.toInternalOrNull((java.util.Map) externalResult$8);
  7. }
  8. // ......
  9. externalResult$24 = externalResult$8;
  10. isNull$25 = externalResult$24 == null;
  11. result$25 = null;
  12. if (!isNull$25) {
  13. result$25 = (org.apache.flink.table.data.MapData) converter$9.toInternalOrNull((java.util.Map) externalResult$24);
  14. }

相关文章

最新文章

更多