org.apache.flink.table.api.tableexception:结果的arity[3]与请求类型的数字[2]不匹配

nnsrf1az  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(514)

我有一个简单的测试用例来学习如何使用表api和case/when,如下所示:

  1. import org.apache.flink.api.scala.ExecutionEnvironment
  2. import org.apache.flink.table.api.TableEnvironment
  3. import org.apache.flink.api.scala._
  4. case class Person(name: String, age: Int)
  5. object TableTest {
  6. def main(args: Array[String]): Unit = {
  7. val env = ExecutionEnvironment.getExecutionEnvironment
  8. val te = TableEnvironment.getTableEnvironment(env)
  9. val ds = env.fromCollection(Seq(Person("a",20), Person("b",40), Person("c", 60)))
  10. te.registerDataSet("person", ds)
  11. te.toDataSet[Person](table).print()
  12. val table = te.sqlQuery(
  13. """
  14. select name,age,
  15. case
  16. when age <= 20 then 'A'
  17. when age <=40 then 'B'
  18. when age <= 60 then 'C'
  19. else 'D'
  20. end as age_level
  21. from person
  22. """.stripMargin(' '))
  23. te.toDataSet[Person](table).print()
  24. }
  25. }

当我运行它时,我遇到以下异常 age_level 是一个计算列,我不知道为什么会出错

  1. Exception in thread "main" org.apache.flink.table.api.TableException: Arity [3] of result [ArrayBuffer(String, Integer, String)] does not match the number[2] of requested type [com.flink.table.Person(name: String, age: Integer)].
  2. at org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:1165)
  3. at org.apache.flink.table.api.BatchTableEnvironment.getConversionMapper(BatchTableEnvironment.scala:339)
  4. at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:504)
  5. at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:476)
  6. at org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:141)
  7. at com.flink.table.TableTest$.main(TableTest.scala:37)
  8. at com.flink.table.TableTest.main(TableTest.scala)
k5ifujac

k5ifujac1#

问题是要转换为 DataSet[Person] 有三个属性: (name, age, age_level)Person case类只有两个字段: (name, age) .
您可以实现一个新的case类

  1. case class PersonWithAgeLevel(name: String, age: Int, age_level: String)

并将表转换为 DataSet[PersonWithAgeLevel] .

相关问题