我试图按照文档创建一个表函数来“展平”一些数据。当使用 joinLateral
但在使用时要进行展平 leftOuterJoinLateral
我得到以下错误。我使用的是scala,并尝试了表api和sql,得到了相同的结果:
原因:java.lang.nullpointerexception:不能将null结果存储在case类中。
我的工作是:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.functions.TableFunction
object example_job{
// Split the List[Int] into multiple rows
class Split() extends TableFunction[Int] {
def eval(nums: List[Int]): Unit = {
nums.foreach(x =>
if(x != 3) {
collect(x)
})
}
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironment()
val tableEnv = StreamTableEnvironment.create(env)
val splitMe = new Split()
// Create some dummy data
val events: DataStream[(String, List[Int])] = env.fromElements(("simon", List(1,2,3)), ("jessica", List(3)))
val table = tableEnv.fromDataStream(events, 'name, 'numbers)
.leftOuterJoinLateral(splitMe('numbers) as 'number)
.select('name, 'number)
table.toAppendStream[(String, Int)].print()
env.execute("Flink jira ticket example")
}
}
当我改变的时候 .leftOuterJoinLateral
至 .joinLateral
我得到了预期的结果:
(simon,1)
(simon,2)
当使用 .leftOuterJoinLateral
我希望有这样的结果:
(simon,1)
(simon,2)
(simon,null) // or (simon, None)
(jessica,null) // or (jessica, None)
似乎这可能是scala api的一个bug?我想在开罚单之前先检查一下这里,以防万一我做了什么蠢事!
1条答案
按热度按时间kgsdhlau1#
问题是flink per default确实期望一行的所有字段都是非空的。这就是程序在看到错误时失败的原因
null
外部联接操作的结果。为了接受null
值,则需要通过或者必须指定结果类型以允许空值,例如,指定自定义pojo输出类型:
具有