我注意到apache flink没有优化表的连接顺序。目前,它保持用户指定的连接顺序(基本上,它从字面上理解查询)。我认为apache方解石可以优化连接顺序,但由于某些原因,这些规则在apache flink中没有使用。
例如,如果我们有两个表'r'和's'
private val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
private val fileNumber = 1
tableEnv.registerTableSource("R", getDataSourceR(fileNumber))
tableEnv.registerTableSource("S", getDataSourceS(fileNumber))
private val r = tableEnv.scan("R")
private val s = tableEnv.scan("S")
我们假设‘s’是空的,我们想用两种方式连接这些表:
val tableOne = r.as("x1, x2").join(r.as("x3, x4")).where("x2 === x3").select("x1, x4")
.join(s.as("x5, x6")).where("x4 === x5 ").select("x1, x6")
val tableTwo = s.as("x1, x2").join(r.as("x3, x4")).where("x2 === x3").select("x1, x4")
.join(r.as("x5, x6")).where("x4 === x5 ").select("x1, x6")
如果我们要计算表1和表2中的行数,在这两种情况下结果都是零。问题是,评估表1比评估表2花费的时间要长得多。
有没有什么方法可以让我们自动优化连接的执行顺序,甚至通过添加一些统计信息来启用可能的计划成本操作?如何添加这些统计数据?
在这个链接的文档中写到,可能有必要更改表环境calciteconfig,但我不清楚如何做。
请帮忙。
1条答案
按热度按时间lokaqttq1#
连接重新排序未启用,因为flink不能很好地处理统计信息。没有精确基数估计的重新排序联接基本上是赌博。因此,将禁用连接重新排序,并按照用户提供的顺序连接表。这提供了一个确定性和可控的行为。
但是,可以通过传递
TableConfig
用一个CalciteConfig
在创建TableEnvironment
,即tableenvironment.gettableenvironment(env,yourtableconfig)。在CalciteConfig
您可以将优化规则添加到不同的优化阶段。你可能想加上JoinCommuteRule
以及JoinAssociateRule
到逻辑优化阶段。您可能还需要深入研究代码,以检查如何将统计信息传递到优化器中。