在结构化流媒体中,需要对自定义项中的表执行一些查询。问题是,在udf中,如果我尝试使用spark.sql,就会出现空指针异常。在这里最好的方法是什么。
基本上,我需要从一个表流,然后使用该数据从另一个表执行一些范围查询。
如。
val appleFilter = udf((appleId : String) => {
val query = "select count(*) from appleMart where appleId='"+appleId+"'"
val appleCount = spark.sql(query).collect().head.getLong(0)
(appleCount>0)
})
val newApple = apples.filter(appleFilter($"appleId"))
1条答案
按热度按时间weylhg0b1#
对于这项任务来说,这并不是一种正确的方法—您不应该在udf中执行单独的查询,因为spark将无法并行化/优化它们。
更好的方法是在流式Dataframe和
appleMart
Dataframe-这将允许spark优化所有操作。正如我从代码中了解到的,您只需要检查是否有具有给定id的苹果。在这种情况下,您只需执行内部联接—这将只留下在表中有行的idappleMart
,类似于:如果你因为某种原因需要离开
apples
中不存在的条目appleMart
,您可以使用left
加入。。。p、 美国if
appleMart
不会经常更改,可以缓存它。不过,对于流式处理作业,从性能的Angular 来看,类似cassandra的查找表可能会更好。