scala—如何将变量传递给spark dataframe中的where子句

z4iuyo4d  于 2021-05-29  发布在  Spark
关注(0)|答案(3)|浏览(717)

我正在尝试将变量scd\u filter传递给spark中Dataframe中的where子句,因为我接收到一个错误,但是当直接传递时,它工作正常。我这样做是为了根据不同的场景动态传递这个过滤器,以备将来使用。

val SCD_filter = """currentDF.col("u_business_unit") <=> updatedDF.col("u_business_unit")
                     |      and(currentDF.col("u_operation_level_2") <=> updatedDF.col("u_operation_level_2"))
                     |      and(currentDF.col("u_operation_level_3") <=> updatedDF.col("u_operation_level_3"))""".stripMargin

然后我将变量传递给下面的代码:

val common_unchangedata = currentDF.alias("currentDF")
.join(updatedDF, currentDF.col("Sys_id") === updatedDF.col("Sys_id"), "inner")
.select("currentDF.*")
.where(s"$SCD_filter")  /// passing the variable which is causing the error
.show()

收到错误:

Exception in thread "main" org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: Database 'currentdf' not found;

注意:currentdf很好,因为当变量被移除时代码正在执行,我们将条件传递给变量的where子句instaed

p4rjhz4m

p4rjhz4m1#

试试这个-

val common_unchangedata = currentDF.alias("currentDF")
.join(updatedDF, currentDF.col("Sys_id") === updatedDF.col("Sys_id"), "inner")
.select("*")
.where($"Column1='$data'")  /// passing the variable which is causing the error
.show()
zbq4xfa0

zbq4xfa02#

根据你的评论
实际上,这个代码将为5个表运行,所有的表都将有不同的列,所有的条件都将不同
我会用这样的方法解决这个问题,

import org.apache.spark.sql.functions._

val cond1 = $"u_business_unit"     <=> $"updatedDF.u_business_unit"
val cond2 = $"u_operation_level_2" <=> $"updatedDF.u_operation_level_2"
val cond3 = $"u_operation_level_3" <=> $"updatedDF.u_operation_level_3"

val SCD_filter = cond1.and(cond2).and(cond3)

val common_unchangedata = currentDF
.join(updatedDF, currentDF.col("Sys_id") === updatedDF.col("Sys_id"), "inner")
.where(SCD_filter) 
.show()

你可以把它扩展成一个函数 getCondition(tableName:String):Column 它在运行时根据处理的数据类型构造适当的条件。

qzlgjiam

qzlgjiam3#

为两者创建别名 DataFrames &像这样使用别名 alias.column_name 内部字符串。

val SCD_filter = """
   (
     (currentDF.u_business_unit <=> updatedDF.u_business_unit) and 
     (currentDF.u_operation_level_2 <=> updatedDF.colu_operation_level_2) and 
     (currentDF.u_operation_level_3 <=> updatedDF.u_operation_level_3)
   )
"""
val common_unchangedata = currentDF.alias("currentDF")
.join(updatedDF, currentDF.col("Sys_id") === updatedDF.col("Sys_id"), "inner")
.select("currentDF.*")
.where(SCD_filter)
.show()

相关问题