scala—如何创建动态筛选条件并使用它筛选sparkDataframe上的行?

csga3l58  于 2021-05-19  发布在  Spark
关注(0)|答案(2)|浏览(690)

我从一个文件中读取两个变量,并用它过滤到一个Dataframe。变量的计数每天都会变化,所以我需要动态地构建过滤条件,这样就不需要更改代码。

clientid1=1,2,3,4
clientref1=10,20,30,40 
clientid2=1,2,3,4
clientref2=10,20,30,40

我逐行读取上面的文件并按“=”拆分,然后将其加载到map[string,string]。为了得到值,我传递键“clientid1”并将值除以“,”

val clientid1 :Array[String] = parameterValues("clientid1").split(",")

我用下面这些变量来过滤df,下面的工作正常。

val finalDF = customerDF
              .filter((col("clientid").isin(clientid1: _*) && col("clientref").isin(clientref1: _*))
                      ||(col("clientid").isin(clientid2: _*) && col("clientref").isin(clientref2: _*)))

现在我需要动态构建下面的部分,

(col("clientid").isin(clientid1: _*) && col("clientref").isin(clientref1: _*))
                          ||(col("clientid").isin(clientid2: _*) && col("clientref").isin(clientref2: _*))

如果输入文件中存在clientid3和clientref3,则应设置过滤条件

(col("clientid").isin(clientid1: _*) && col("clientref").isin(clientref1: _*))
||(col("clientid").isin(clientid2: _*) && col("clientref").isin(clientref2: _*))
||(col("clientid").isin(clientid3: _*) && col("clientref").isin(clientref3: _*))

我使用了下面这样的函数来生成条件,但是它返回一个字符串,而筛选条件不接受它,

def generateFilterCond(count: Int, parameterValues: Map[String,String]): String ={
     var filterCond = ""
     for ( i <- 1 to count){
     val clientid :Array[String] = parameterValues("clientid$i").split(",")
     val clientref :Array[String] = parameterValues("clientref$i").split(",")

        filterCond = filterCond + (col("clientid").isin(clientid: _*) && col("clientref").isin(clientref: _*)) + " || " 
     }
        filterCond.substring(0, length-4).trim
}

关于如何建立这个有什么建议吗?

cbjzeqam

cbjzeqam1#

的确, filter 点燃Spark Column 作为参数而不是方法 generateFilterCond 返回一个 String . 因此,第一步是更改签名以返回 Column 但是,我们现在有一个问题,因为没有初始化 filterCond 函数第一行中的一个空 String ,我们需要用一个空的 Column ,它在spark中不存在。
但是我们知道我们将把这个列传递给一个过滤器,并且我们要写的条件只是一个or的序列,我们可以将这个col初始化为一个总是false的条件,比如 lit(false) .
因此,我们可以将您的方法重写如下:

def generateFilterCond(count: Int, parameterValues: Map[String,String]): Column = {
    var filterCond = lit(false)
    for (i <- 1 to count) {
      val clientid: Array[String] = parameterValues(s"clientid$i").split(",")
      val clientref: Array[String] = parameterValues(s"clientref$i").split(",")

      val addedCondition = col("clientid").isin(clientid: _*) && col("clientref").isin(clientref: _*)

      filterCond = filterCond || addedCondition
    }
    filterCond
  }

这种方法还存在一些问题。例如,一个 NotSuchElementException 如果parametervaluesMap之间不包含特定的clientid/clientref,则会引发异常 1 以及 count . 此外,它不是一个经典的函数样式,带有可变累加器和循环。
因此,假设我们忽略了所有有问题的情况(没有clientid/clientref夫妇,或者clientid或clientref中的一个不存在,但另一个存在),这个函数的最终重写可以是:

def generateFilterCond(count: Int, parameterValues: Map[String, String]): Column = (1 to count)
    .map(i => (parameterValues.get(s"clientid$i"), parameterValues.get(s"clientref$i")))
    .flatMap(elem => for {
      clientIds <- elem._1
      clientRefs <- elem._2
    } yield col("clientid").isin(clientIds.split(","): _*) && col("clientref").isin(clientRefs: _*)
    ).foldLeft(lit(false))((acc, c) => acc || c)

另外请注意,如果动态筛选条件产生的列表达式太大,则可能会出现问题,因此应将变量文件保持为小的

jaxagkaj

jaxagkaj2#

为了完成我之前的回答,如果你的变量文件包含很多行,而不是试图建立一个动态的过滤条件,你可以把你的变量文件读入一个带有spark的dataframe中,通过把它和变量文件中的dataframe连接起来过滤你的输入dataframe

无动态过滤条件的过滤Dataframe

在下面的代码中,变量文件是variables.txt,应用过滤器的Dataframe是 input ,spark会话是 spark ```
import spark.implicits._

def extractInfo(line: String): (Int, String, Seq[Int]) = {
val mainId = line.dropWhile(.isLetter).takeWhile(.isDigit).toInt
val kind = line.takeWhile(.isLetter)
val values = line.dropWhile(s => s.isLetterOrDigit).tail.split(",").map(
.toInt)
(mainId, kind, values)
}

def explodeIdRef(infos: Seq[(Int, String, Seq[Int])]): Seq[(Int, Int)] = for {
clientId <- infos.find(_.2 == "clientid").map(.3.toSeq).getOrElse(Nil)
clientRef <- infos.find(
.2 == "clientref").map(._3.toSeq).getOrElse(Nil)
} yield (clientId, clientRef)

val config = spark.read.text("variables.txt")
.as[String]
.map(extractInfo)
.groupByKey(_.1)
.flatMapGroups((
, iterator) => explodeIdRef(iterator.toSeq))
.toDF("clientid", "clientref")

val filtered = input.join(config, Seq("clientid", "clientref"))

extractinfo将解析variables.txt中的一行并提取有用的列。例如,如果行是 `"clientid1=2,3"` ,extractinfo返回 `(1, "clientid", Seq(2, 3))` explodedRef将在给定一系列提取信息的情况下创建所有clientid/clientref。例如,如果你有 `Seq((1, "clientid", Seq(2, 3)), (1, "clientref", Seq(20, 30)))` ,explodedRef返回 `Seq((2, 20), (2, 30), (3, 20), (3, 30))` ####试验
如果variables.txt包含以下行:

clientid1=1,2,3,4
clientref1=10,20,30,40
clientid2=5,6,7,8
clientref2=50,60,70,80

你有以下几点 `input` Dataframe:

+---+--------+---------+
|id |clientid|clientref|
+---+--------+---------+
|1 |1 |10 |
|2 |2 |30 |
|3 |8 |10 |
|4 |8 |50 |
|5 |9 |90 |
+---+--------+---------+

这个 `filtered` Dataframe将是:

+---+--------+---------+
|id |clientid|clientref|
+---+--------+---------+
|1 |1 |10 |
|2 |2 |30 |
|4 |8 |50 |
+---+--------+---------+

相关问题