我是斯卡拉的新人。我已经使用udf实现了一个多列数据集验证的解决方案,而不是遍历for循环中的各个列。但我不知道这是如何工作更快,我必须解释这是更好的解决办法。
用于数据验证的列将在运行时接收,因此我们不能在代码中硬编码列名。当列值在验证中失败时,还需要用列名更新comments列。
旧代码,
def doValidate(data: Dataset[Row], columnArray: Array[String], validValueArrays: Array[String]): Dataset[Row] = {
var ValidDF: Dataset[Row] = data
var i:Int = 0
for (s <- columnArray) {
var list = validValueArrays(i).split(",")
ValidDF = ValidDF.withColumn("comments",when(ValidDF.col(s).isin(list: _*),concat(lit(col("comments")),lit(" Error: Invalid Records in: ") ,lit(s))).otherwise(col("comments")))
i = i + 1
}
return ValidDF;
}
新代码,
def validateColumnValues(data: Dataset[Row], columnArray: Array[String], validValueArrays: Array[String]): Dataset[Row] = {
var ValidDF: Dataset[Row] = data
var checkValues = udf((row: Row, comment: String) => {
var newComment = comment
for (s: Int <- 0 to row.length-1) {
var value = row.get(s)
var list = validValueArrays(s).split(",")
if(!list.contains(value))
{
newComment = newComment + " Error:Invalid Records in: " + columnArray(s) +";"
}
}
newComment
});
ValidDF = ValidDF.withColumn("comments",checkValues(struct(columnArray.head, columnArray.tail: _*),col("comments")))
return ValidDF;
}
columnarray-->将具有列列表
validvaluearrays-->将具有与列数组位置相对应的有效值。将分隔多个有效值。
我想知道哪一个更好或任何其他更好的方法来做这件事。当我测试新代码时,它看起来更好。另外,这两种逻辑之间的区别是,正如我读到的,udf是spark的黑盒子。在这种情况下,自定义项在任何情况下都会影响性能?
1条答案
按热度按时间qhhrdooz1#
在运行它之前,我需要纠正一些封闭的括号。返回validdf时要删除一个'}'。我仍然得到一个运行时分析错误。
最好避免使用自定义项,因为自定义项意味着反序列化以处理经典scala中的数据,然后重新序列化。但是,如果您的需求无法使用内置sql函数存档,那么您必须使用udf,但您必须确保查看sparkui的性能和执行计划。