我有spark嵌套框架df 1。我想过滤的基础上failed_rules
列有单引号。我试过下面的代码,但得到错误。
import spark.implicits._
val columns = Seq("failed_rules","CMF_BANKRPT_IND","year")
val tupleSeq: Seq[(String, String, Int)] = Seq(("`name: isComplete,args:{column: cmf_lf_assess_num_hist,where_cond: CMF_BANKRPT_IND == 'B'}`", "B" , 1981),("`name: isComplete,Failure_Flag: Error,args:{column: cmf_lf_assess_num_hist,where_cond: CMF_BANKRPT_IND == 'B'}`", "A", 1989),("4", "John", 1991))
val df1=spark.createDataFrame(tupleSeq).toDF(columns:_*)
val extraCols = List("`name: isComplete,args:{column: cmf_lf_assess_num_hist,where_cond: CMF_BANKRPT_IND == 'B'}`")
val rules_wherecond = extraCols.filter( name => name.contains("where_cond"))
for (i <- 0 until rules_wherecond.length) {
df1.filter(s"failed_rules == ${rules_wherecond(i)}").show(false)
}
错误消息:
Exception:无法解析'name: isComplete,args:{column: cmf_lf_assess_num_hist,where_cond: CMF_BANKRPT_IND == 'B'}
'给定输入列:[failed_rules,CMF_BANKRPT_IND,year];第1行pos 16;
2条答案
按热度按时间zbsbpyhn1#
Seq.filter不会自动解析json。所以你的实际where条件不会被解析出来并提供给Spark,所以Spark会正确地抱怨你给它的过滤器不是一个有效的列或表达式。
像这样构建过滤器充满了其他潜在的问题和缺点(如缺乏多个规则或审计等),可能值得单独查看Quality的想法,但核心问题似乎是在使用过滤器之前解析json。
km0tfn4u2#
列名称和筛选值的显式分离可以帮助: