spark:添加带有Map逻辑的列,而不使用udf

4szc88ey  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(506)

基本上,我想在dataframe的每一行上应用countsimilarcolumns函数,并将结果放入一个新的列中。
我的代码如下

def main(args: Array[String]) = {
    val customerID           = "customer-1" //args(0)
    val rawData              = readFromResource("json", "/spark-test-data-copy.json")
    val flattenData          = rawData.select(flattenSchema(rawData.schema): _*)
    val referenceCustomerRow = flattenData.transform(getCustomer(customerID)).first
  }

def getCustomer(customerID: String)(dataFrame: DataFrame) = {
    dataFrame.filter($"customer" === customerID)
  }

def countSimilarColumns(first: Row, second: Row): Int = {
    if (!(first.getAs[String]("customer").equals(second.getAs[String]("customer"))))
      first.toSeq.zip(second.toSeq).count { case (x, y) => x == y }
    else
      -1
  }

我想做如下的事情。但我不知道怎么做。

flattenData
  .withColumn(
    "similarity_score",
    flattenData.map(row => countSimilarColumns(row, referenceCustomerRow))
  )
  .show()

样本数据:

{"customer":"customer-1","att-a":"7","att-b":"3","att-c":"10","att-d":"10"}
{"customer":"customer-2","att-a":"9","att-b":"7","att-c":"12","att-d":"4"}
{"customer":"customer-3","att-a":"7","att-b":"3","att-c":"1","att-d":"10"}
{"customer":"customer-4","att-a":"9","att-b":"14","att-c":"10","att-d":"4"}

期望输出:

+--------------------+-----------+
| customer   | similarity_score |
+--------------------+-----------+
|customer-1  |  -1    | 
|customer-2  |  0    |
|customer-3  |  3    |
|customer-4  |  1    |

自定义项是唯一的方法吗?如果是的话,那么我想保持我的函数count列与它相同,这样它是可测试的。怎么可能?我是spark/scala的新手。

ego6inou

ego6inou1#

您需要充分了解的是,df上的所有标准、非聚合操作一次只能看到一行。所以,你必须从一行或一个集合的Angular 来考虑。在单行上下文中对整个df调用函数 withColumn 这个电话很不寻常。当您要将行与其他行进行比较/合并时,通常会将表与另一个表或表本身联接,然后使用合并的表。
你想要的是类似的东西

// create DF with similarity scores AND customer, which is treated as row id here
val similarityDF = flattenData.map(row => row.getAs[String]("customer") -> countSimilarColumns(row, referenceCustomerRow)).
                   toDF("customer","similarity_score")

// join your original DF with similarityDF based on "customer"
flattenData.join(simlarityDF, usingColumn = "customer").show()

这允许您匹配两个df的行。
udf在这里并不真正适用,因为与内置函数类似,udf应用于一个(或多个)特定列以生成值(可以使用 withColumn ),您要查看整排。

nmpmafwu

nmpmafwu2#

flattenData 属于类型 DataFrame &在数据上应用map函数可以得到 Dataset .
你通过了考试的成绩 flattenData.map(row => countSimilarColumns(row, referenceCustomerRow))withColumn 但是 withColumn 只能获取类型的数据 org.apache.spark.sql.Column 所以,如果你想添加以上结果 UDF 必须使用的列 collect 函数并将其传递给 lit 请检查下面的代码。

flattenData
.withColumn("similarity_score",lit(
        flattenData
        .map(row => countSimilarColumns(row, referenceCustomerRow))
        .collect
        .map(_.toInt)
   )
)

基于样本数据,添加以下逻辑。

scala> df.show(false)
+-----+-----+-----+-----+----------+
|att-a|att-b|att-c|att-d|customer  |
+-----+-----+-----+-----+----------+
|7    |3    |10   |10   |customer-1|
|9    |7    |12   |4    |customer-2|
|7    |3    |1    |10   |customer-3|
|9    |14   |10   |4    |customer-4|
+-----+-----+-----+-----+----------+

scala> val conditions = df.columns.filterNot(_ == "customer").map(c => (when(count(col(c)).over(Window.partitionBy(col(c)).orderBy(col(c).asc)) =!= 1,lit(1)).otherwise(0))).reduce(_ + _) // if row_number is 1 then adding 1 else 0 ..
conditions: org.apache.spark.sql.Column = (((CASE WHEN (NOT (count(att-a) OVER (PARTITION BY att-a ORDER BY att-a ASC NULLS FIRST unspecifiedframe$()) = 1)) THEN 1 ELSE 0 END + CASE WHEN (NOT (count(att-b) OVER (PARTITION BY att-b ORDER BY att-b ASC NULLS FIRST unspecifiedframe$()) = 1)) THEN 1 ELSE 0 END) + CASE WHEN (NOT (count(att-c) OVER (PARTITION BY att-c ORDER BY att-c ASC NULLS FIRST unspecifiedframe$()) = 1)) THEN 1 ELSE 0 END) + CASE WHEN (NOT (count(att-d) OVER (PARTITION BY att-d ORDER BY att-d ASC NULLS FIRST unspecifiedframe$()) = 1)) THEN 1 ELSE 0 END)

最终结果

scala> df.withColumn("similarity_score",conditions).show(false)
+-----+-----+-----+-----+----------+----------------+
|att-a|att-b|att-c|att-d|customer  |similarity_score|
+-----+-----+-----+-----+----------+----------------+
|9    |7    |12   |4    |customer-2|2               |
|7    |3    |1    |10   |customer-3|3               |
|7    |3    |10   |10   |customer-1|4               |
|9    |14   |10   |4    |customer-4|3               |
+-----+-----+-----+-----+----------+----------------+

相关问题