flinksql:用纯sql语法连接带有时间戳的表

krcsximq  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(1235)

我在使用flink中的sql语法连接多个表时遇到了一些问题,其中至少有一个表具有time属性列。
我有一张table Table1 当rowtime用作flink rowtime时,使用模式(id,value1,rowtime)的。
我想把这张table和一张table连接起来 Table2 ,使用架构(id,value2)。连接必须在匹配时完成 id .
最后,我想使用一个滚动时间窗口对这个连接的结果进行分组。
是否可以只使用sql语法来实现这一点?
下面是我想做的一个例子:

  1. SELECT
  2. Table1.id as id,
  3. TUMBLE_END(rowtime, INTERVAL '10' SECOND),
  4. MAX(value1) as value1,
  5. MAX(value2) as value2
  6. FROM Table1 JOIN TABLE2 ON Table1.id = Table2.id
  7. GROUP BY Table1.id, TUMBLE(rowtime, INTERVAL '10' SECOND)

但它给了我以下错误:

  1. 2019-11-12 16:37:57.191 [main] ERROR - Cannot generate a valid execution plan for the given query:
  2. FlinkLogicalCalc(expr#0..6=[{inputs}], id=[$t0], EXPR$1=[$t4], value1=[$t1], value2=[$t2])
  3. FlinkLogicalWindowAggregate(group=[{0}], value1=[MAX($2)], value2=[MAX($3)])
  4. FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[0], proj#0..1=[{exprs}], value1=[$t3], value2=[$t3])
  5. FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
  6. FlinkLogicalTableSourceScan(table=[[Table1]], fields=[id, value1, rowtime], source=[KafkaTableSource(id, value1, rowtime)])
  7. FlinkLogicalTableSourceScan(table=[[Table2]], fields=[id, value2], source=[Table2_Type(id, value2)])
  8. Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
  9. Please check the documentation for the set of currently supported SQL features.
  10. org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:
  11. FlinkLogicalCalc(expr#0..6=[{inputs}], id=[$t0], EXPR$1=[$t4], value1=[$t1], value2=[$t2])
  12. FlinkLogicalWindowAggregate(group=[{0}], value1=[MAX($2)], value2=[MAX($3)])
  13. FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[0], proj#0..1=[{exprs}], value1=[$t3], value2=[$t3])
  14. FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
  15. FlinkLogicalTableSourceScan(table=[[kafkaDataStream]], fields=[id, value1, rowtime], source=[KafkaTableSource(id, value1, rowtime)])
  16. FlinkLogicalTableSourceScan(table=[[SensorConfigurationUpdateHTTP]], fields=[id, value2], source=[Table2_Type(id, value2)])
  17. Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
  18. Please check the documentation for the set of currently supported SQL features.
  19. at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:387)
  20. at org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:302)
  21. at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:816)
  22. at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:379)
  23. at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
  24. at org.apache.flink.table.api.Table.insertInto(table.scala:1126)
  25. ...

我也试着 rowtime 变成一个 TIMESTAMP 输入(如错误消息所建议的),但是我不能再处理时间窗口了。它会导致以下错误:

  1. 2019-11-12 16:44:52.473 [main] ERROR - Window can only be defined over a time attribute column.
  2. org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
  3. at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:84)
  4. at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:89)
  5. at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
  6. at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
  7. at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
  8. at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
  9. at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
  10. at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
  11. at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211)
  12. at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198)
  13. at org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
  14. at org.apache.flink.table.api.TableEnvironment.runHepPlannerSequentially(TableEnvironment.scala:326)
  15. at org.apache.flink.table.api.TableEnvironment.optimizeNormalizeLogicalPlan(TableEnvironment.scala:282)
  16. at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:813)
  17. at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:379)
  18. at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
  19. at org.apache.flink.table.api.Table.insertInto(table.scala:1126)
mf98qq94

mf98qq941#

联接结果不能包含时间属性,因为联接不能保证时间戳的顺序被保留。flink假设两个表都是动态的,可以在任何时间点更改。表中的新记录 Table2 可能会加入 Table1 ,以“随机”顺序生成带有时间戳的结果。
您可以通过向联接添加时间约束来更改这一点。您可以使用时间窗口联接定义查询,也可以使用模型 Table2 作为时态表和连接 Table1 用它。

相关问题