基本上,我想在dataframe的每一行上应用countsimilarcolumns函数,并将结果放入一个新的列中。
我的代码如下
def main(args: Array[String]) = {
val customerID = "customer-1" //args(0)
val rawData = readFromResource("json", "/spark-test-data-copy.json")
val flattenData = rawData.select(flattenSchema(rawData.schema): _*)
val referenceCustomerRow = flattenData.transform(getCustomer(customerID)).first
}
def getCustomer(customerID: String)(dataFrame: DataFrame) = {
dataFrame.filter($"customer" === customerID)
}
def countSimilarColumns(first: Row, second: Row): Int = {
if (!(first.getAs[String]("customer").equals(second.getAs[String]("customer"))))
first.toSeq.zip(second.toSeq).count { case (x, y) => x == y }
else
-1
}
我想做如下的事情。但我不知道怎么做。
flattenData
.withColumn(
"similarity_score",
flattenData.map(row => countSimilarColumns(row, referenceCustomerRow))
)
.show()
样本数据:
{"customer":"customer-1","att-a":"7","att-b":"3","att-c":"10","att-d":"10"}
{"customer":"customer-2","att-a":"9","att-b":"7","att-c":"12","att-d":"4"}
{"customer":"customer-3","att-a":"7","att-b":"3","att-c":"1","att-d":"10"}
{"customer":"customer-4","att-a":"9","att-b":"14","att-c":"10","att-d":"4"}
期望输出:
+--------------------+-----------+
| customer | similarity_score |
+--------------------+-----------+
|customer-1 | -1 |
|customer-2 | 0 |
|customer-3 | 3 |
|customer-4 | 1 |
自定义项是唯一的方法吗?如果是的话,那么我想保持我的函数count列与它相同,这样它是可测试的。怎么可能?我是spark/scala的新手。
2条答案
按热度按时间ego6inou1#
您需要充分了解的是,df上的所有标准、非聚合操作一次只能看到一行。所以,你必须从一行或一个集合的Angular 来考虑。在单行上下文中对整个df调用函数
withColumn
这个电话很不寻常。当您要将行与其他行进行比较/合并时,通常会将表与另一个表或表本身联接,然后使用合并的表。你想要的是类似的东西
这允许您匹配两个df的行。
udf在这里并不真正适用,因为与内置函数类似,udf应用于一个(或多个)特定列以生成值(可以使用
withColumn
),您要查看整排。nmpmafwu2#
flattenData
属于类型DataFrame
&在数据上应用map函数可以得到Dataset
.你通过了考试的成绩
flattenData.map(row => countSimilarColumns(row, referenceCustomerRow))
至withColumn
但是withColumn
只能获取类型的数据org.apache.spark.sql.Column
所以,如果你想添加以上结果UDF
必须使用的列collect
函数并将其传递给lit
请检查下面的代码。基于样本数据,添加以下逻辑。
最终结果