我有自定义关系函数(我们称之为 SPEC.relate(a, b)
)我想用注入的规则来优化。我希望只是重写逻辑计划,而不是实施我自己的计划 *Exec
实际计划。
鉴于以下情况。。。
a.join(b, SPEC.relate(a.col("spec_field"), b.col("spec_field")))
``` `a.spec_field` 以及 `b.spec_field` 可以Map到一个或多个关键点(是的,关键点之间存在重复项)
优化的版本看起来像这样。。。
a_with_key = a.withColumn("synth_key", F.explode(SPEC.generate_key_array("spec_field")))
b_with_key = b.withColumn("synth_key", F.explode(SPEC.generate_key_array("spec_field")))
a_with_key.join(b_with_key, "synth_key")
.where(SPEC.relate(a_with_key.col("spec_field"), b_with_key.col("spec_field")))
我的想法是这样的,但我不太明白我需要做什么才能使这一逻辑计划实现。
object SpecRelateRule extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
case x @ Join(left, right, joinType, condition @ Some(SPEC.relate(leftExpr, rightExpr)), hint) =>
// TODO can I do the following?
// leftKeys = left.withColumn("synth_key", F.explode(SPEC.generate_key_array(leftExpr)))
// rightKeys = right.withColumn("synth_key", F.explode(SPEC.generate_key_array(rightExpr)))
// joined = Join(leftKeys, rightKeys, joinType, condition, StrategyHint("merge", "synth_key"))
// somehow drop join key
}
}
暂无答案!
目前还没有任何答案,快来回答吧!