sparkscala:sqlrlike与customudf

57hvy0tb  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(395)

我有一个场景,其中10k+正则表达式与其他各种列一起存储在一个表中,需要与传入的数据集连接起来。最初我使用的是“sparksqlrlike”方法,如下所示,它能够保持负载,直到传入的记录计数小于50k
ps:正则表达式引用数据是广播数据集。 dataset.join(regexDataset.value, expr("input_column rlike regular_exp_column") 然后我编写了一个自定义的udf,使用scala本地regex搜索来转换它们,如下所示,
下面val以元组数组的形式收集引用数据。

val regexPreCalcArray: Array[(Int, Regex)] = {
        regexDataset.value
            .select( "col_1", "regex_column")
            .collect
            .map(row => (row.get(0).asInstanceOf[Int],row.get(1).toString.r))
    }

正则表达式匹配udf的实现,

def findMatchingPatterns(regexDSArray: Array[(Int,Regex)]): UserDefinedFunction = {
        udf((input_column: String) => {
            for {
                text <- Option(input_column)
                matches = regexDSArray.filter(regexDSValue => if (regexDSValue._2.findFirstIn(text).isEmpty) false else true)
                if matches.nonEmpty
            } yield matches.map(x => x._1).min
        }, IntegerType)
    }

联接如下所示,其中引用数据中的唯一id将在多个正则表达式匹配的情况下从udf返回,并使用唯一id与引用数据联接,以检索结果所需的其他列,

dataset.withColumn("min_unique_id", findMatchingPatterns(regexPreCalcArray)($"input_column"))
.join(regexDataset.value, $"min_unique_id" === $"unique_id" , "left")

但当记录数增加到1m以上时,执行过程中的偏差也会非常慢[1个执行者任务运行了很长时间]。spark建议不要使用udf,因为它会降低性能,我应该在这里应用任何其他最佳实践,或者是否有比我在这里编写的更好的scala regex匹配api?或者任何有效的建议都会很有帮助。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题