我试图在flink的datasetapi中实现以下简单查询。
select
t1_value1
from
table1
where
t1_suppkey not in (
select
t2_suppkey
from
table2
)
所以我的想法是执行一个left-outer连接(表1.leftouterjoin(表2)…),然后删除所有得到t1_suppkey和t2_suppkey值的行。
所以我试着这样:
output = table1
.leftOuterJoin(table2).where("t1_suppkey").equalTo("t2_suppkey")
.with((Table1 t1, Table2 t2) -> new Tuple2<>(t1.ps_suppkey, t2.s_suppkey))
.returns(new TypeHint <Tuple2<Integer, Integer>>() {});
但是,如果我这样做,它总是会失败“java.lang.nullpointerexception”,我不知道为什么。如果我使用普通连接而不是左外连接,代码可以工作,但这不是我想要的。
我是否需要以不同的方式实现左连接,或者是否有更简单的方法重写DataSetaAPI中的“notin”语句?
2条答案
按热度按时间tyu7yeag1#
dataset api的外部联接调用
JoinFunction
也适用于在内侧找不到连接记录的外部记录。在这种情况下JoinFunction.join()
方法被调用null
.因为您使用的是左外连接,所以第二个参数
Table2 t2
可以是null
. 这个NullPointerException
是由t2.s_suppkey
. 你需要检查一下t2 == null
只有进入t2
如果不为空。您还可以使用
FlatJoinFunction
有一个Collector
参数和仅发射t1
如果t2 == null
.另一个选择是使用flink的批处理sql支持,它支持示例中的查询。
5w9g7ksd2#