在databricks中从udf内部查询delta lake

ffdz8vbo  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(520)

在结构化流媒体中,需要对自定义项中的表执行一些查询。问题是,在udf中,如果我尝试使用spark.sql,就会出现空指针异常。在这里最好的方法是什么。
基本上,我需要从一个表流,然后使用该数据从另一个表执行一些范围查询。
如。

val appleFilter = udf((appleId : String) => {
     val query = "select count(*) from appleMart where appleId='"+appleId+"'"
     val appleCount = spark.sql(query).collect().head.getLong(0)
     (appleCount>0)
})

val newApple = apples.filter(appleFilter($"appleId"))
weylhg0b

weylhg0b1#

对于这项任务来说,这并不是一种正确的方法—您不应该在udf中执行单独的查询,因为spark将无法并行化/优化它们。
更好的方法是在流式Dataframe和 appleMart Dataframe-这将允许spark优化所有操作。正如我从代码中了解到的,您只需要检查是否有具有给定id的苹果。在这种情况下,您只需执行内部联接—这将只留下在表中有行的id appleMart ,类似于:

val appleMart = spark.read.format("delta").load("path_to_delta")
val newApple = apples.join(appleMart, apples("appleId") === appleMart("appleId"))

如果你因为某种原因需要离开 apples 中不存在的条目 appleMart ,您可以使用 left 加入。。。
p、 美国if appleMart 不会经常更改,可以缓存它。不过,对于流式处理作业,从性能的Angular 来看,类似cassandra的查找表可能会更好。

相关问题