我从一个文件中读取两个变量,并用它过滤到一个Dataframe。变量的计数每天都会变化,所以我需要动态地构建过滤条件,这样就不需要更改代码。
clientid1=1,2,3,4
clientref1=10,20,30,40
clientid2=1,2,3,4
clientref2=10,20,30,40
我逐行读取上面的文件并按“=”拆分,然后将其加载到map[string,string]。为了得到值,我传递键“clientid1”并将值除以“,”
val clientid1 :Array[String] = parameterValues("clientid1").split(",")
我用下面这些变量来过滤df,下面的工作正常。
val finalDF = customerDF
.filter((col("clientid").isin(clientid1: _*) && col("clientref").isin(clientref1: _*))
||(col("clientid").isin(clientid2: _*) && col("clientref").isin(clientref2: _*)))
现在我需要动态构建下面的部分,
(col("clientid").isin(clientid1: _*) && col("clientref").isin(clientref1: _*))
||(col("clientid").isin(clientid2: _*) && col("clientref").isin(clientref2: _*))
如果输入文件中存在clientid3和clientref3,则应设置过滤条件
(col("clientid").isin(clientid1: _*) && col("clientref").isin(clientref1: _*))
||(col("clientid").isin(clientid2: _*) && col("clientref").isin(clientref2: _*))
||(col("clientid").isin(clientid3: _*) && col("clientref").isin(clientref3: _*))
我使用了下面这样的函数来生成条件,但是它返回一个字符串,而筛选条件不接受它,
def generateFilterCond(count: Int, parameterValues: Map[String,String]): String ={
var filterCond = ""
for ( i <- 1 to count){
val clientid :Array[String] = parameterValues("clientid$i").split(",")
val clientref :Array[String] = parameterValues("clientref$i").split(",")
filterCond = filterCond + (col("clientid").isin(clientid: _*) && col("clientref").isin(clientref: _*)) + " || "
}
filterCond.substring(0, length-4).trim
}
关于如何建立这个有什么建议吗?
2条答案
按热度按时间cbjzeqam1#
的确,
filter
点燃SparkColumn
作为参数而不是方法generateFilterCond
返回一个String
. 因此,第一步是更改签名以返回Column
但是,我们现在有一个问题,因为没有初始化filterCond
函数第一行中的一个空String
,我们需要用一个空的Column
,它在spark中不存在。但是我们知道我们将把这个列传递给一个过滤器,并且我们要写的条件只是一个or的序列,我们可以将这个col初始化为一个总是false的条件,比如
lit(false)
.因此,我们可以将您的方法重写如下:
这种方法还存在一些问题。例如,一个
NotSuchElementException
如果parametervaluesMap之间不包含特定的clientid/clientref,则会引发异常1
以及count
. 此外,它不是一个经典的函数样式,带有可变累加器和循环。因此,假设我们忽略了所有有问题的情况(没有clientid/clientref夫妇,或者clientid或clientref中的一个不存在,但另一个存在),这个函数的最终重写可以是:
另外请注意,如果动态筛选条件产生的列表达式太大,则可能会出现问题,因此应将变量文件保持为小的
jaxagkaj2#
为了完成我之前的回答,如果你的变量文件包含很多行,而不是试图建立一个动态的过滤条件,你可以把你的变量文件读入一个带有spark的dataframe中,通过把它和变量文件中的dataframe连接起来过滤你的输入dataframe
无动态过滤条件的过滤Dataframe
在下面的代码中,变量文件是variables.txt,应用过滤器的Dataframe是
input
,spark会话是spark
```import spark.implicits._
def extractInfo(line: String): (Int, String, Seq[Int]) = {
val mainId = line.dropWhile(.isLetter).takeWhile(.isDigit).toInt
val kind = line.takeWhile(.isLetter)
val values = line.dropWhile(s => s.isLetterOrDigit).tail.split(",").map(.toInt)
(mainId, kind, values)
}
def explodeIdRef(infos: Seq[(Int, String, Seq[Int])]): Seq[(Int, Int)] = for {
clientId <- infos.find(_.2 == "clientid").map(.3.toSeq).getOrElse(Nil)
clientRef <- infos.find(.2 == "clientref").map(._3.toSeq).getOrElse(Nil)
} yield (clientId, clientRef)
val config = spark.read.text("variables.txt")
.as[String]
.map(extractInfo)
.groupByKey(_.1)
.flatMapGroups((, iterator) => explodeIdRef(iterator.toSeq))
.toDF("clientid", "clientref")
val filtered = input.join(config, Seq("clientid", "clientref"))
clientid1=1,2,3,4
clientref1=10,20,30,40
clientid2=5,6,7,8
clientref2=50,60,70,80
+---+--------+---------+
|id |clientid|clientref|
+---+--------+---------+
|1 |1 |10 |
|2 |2 |30 |
|3 |8 |10 |
|4 |8 |50 |
|5 |9 |90 |
+---+--------+---------+
+---+--------+---------+
|id |clientid|clientref|
+---+--------+---------+
|1 |1 |10 |
|2 |2 |30 |
|4 |8 |50 |
+---+--------+---------+