spark:添加带有Map逻辑的列,而不使用udf

4szc88ey  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(550)

基本上,我想在dataframe的每一行上应用countsimilarcolumns函数,并将结果放入一个新的列中。
我的代码如下

  1. def main(args: Array[String]) = {
  2. val customerID = "customer-1" //args(0)
  3. val rawData = readFromResource("json", "/spark-test-data-copy.json")
  4. val flattenData = rawData.select(flattenSchema(rawData.schema): _*)
  5. val referenceCustomerRow = flattenData.transform(getCustomer(customerID)).first
  6. }
  7. def getCustomer(customerID: String)(dataFrame: DataFrame) = {
  8. dataFrame.filter($"customer" === customerID)
  9. }
  10. def countSimilarColumns(first: Row, second: Row): Int = {
  11. if (!(first.getAs[String]("customer").equals(second.getAs[String]("customer"))))
  12. first.toSeq.zip(second.toSeq).count { case (x, y) => x == y }
  13. else
  14. -1
  15. }

我想做如下的事情。但我不知道怎么做。

  1. flattenData
  2. .withColumn(
  3. "similarity_score",
  4. flattenData.map(row => countSimilarColumns(row, referenceCustomerRow))
  5. )
  6. .show()

样本数据:

  1. {"customer":"customer-1","att-a":"7","att-b":"3","att-c":"10","att-d":"10"}
  2. {"customer":"customer-2","att-a":"9","att-b":"7","att-c":"12","att-d":"4"}
  3. {"customer":"customer-3","att-a":"7","att-b":"3","att-c":"1","att-d":"10"}
  4. {"customer":"customer-4","att-a":"9","att-b":"14","att-c":"10","att-d":"4"}

期望输出:

  1. +--------------------+-----------+
  2. | customer | similarity_score |
  3. +--------------------+-----------+
  4. |customer-1 | -1 |
  5. |customer-2 | 0 |
  6. |customer-3 | 3 |
  7. |customer-4 | 1 |

自定义项是唯一的方法吗?如果是的话,那么我想保持我的函数count列与它相同,这样它是可测试的。怎么可能?我是spark/scala的新手。

ego6inou

ego6inou1#

您需要充分了解的是,df上的所有标准、非聚合操作一次只能看到一行。所以,你必须从一行或一个集合的Angular 来考虑。在单行上下文中对整个df调用函数 withColumn 这个电话很不寻常。当您要将行与其他行进行比较/合并时,通常会将表与另一个表或表本身联接,然后使用合并的表。
你想要的是类似的东西

  1. // create DF with similarity scores AND customer, which is treated as row id here
  2. val similarityDF = flattenData.map(row => row.getAs[String]("customer") -> countSimilarColumns(row, referenceCustomerRow)).
  3. toDF("customer","similarity_score")
  4. // join your original DF with similarityDF based on "customer"
  5. flattenData.join(simlarityDF, usingColumn = "customer").show()

这允许您匹配两个df的行。
udf在这里并不真正适用,因为与内置函数类似,udf应用于一个(或多个)特定列以生成值(可以使用 withColumn ),您要查看整排。

nmpmafwu

nmpmafwu2#

flattenData 属于类型 DataFrame &在数据上应用map函数可以得到 Dataset .
你通过了考试的成绩 flattenData.map(row => countSimilarColumns(row, referenceCustomerRow))withColumn 但是 withColumn 只能获取类型的数据 org.apache.spark.sql.Column 所以,如果你想添加以上结果 UDF 必须使用的列 collect 函数并将其传递给 lit 请检查下面的代码。

  1. flattenData
  2. .withColumn("similarity_score",lit(
  3. flattenData
  4. .map(row => countSimilarColumns(row, referenceCustomerRow))
  5. .collect
  6. .map(_.toInt)
  7. )
  8. )

基于样本数据,添加以下逻辑。

  1. scala> df.show(false)
  2. +-----+-----+-----+-----+----------+
  3. |att-a|att-b|att-c|att-d|customer |
  4. +-----+-----+-----+-----+----------+
  5. |7 |3 |10 |10 |customer-1|
  6. |9 |7 |12 |4 |customer-2|
  7. |7 |3 |1 |10 |customer-3|
  8. |9 |14 |10 |4 |customer-4|
  9. +-----+-----+-----+-----+----------+
  10. scala> val conditions = df.columns.filterNot(_ == "customer").map(c => (when(count(col(c)).over(Window.partitionBy(col(c)).orderBy(col(c).asc)) =!= 1,lit(1)).otherwise(0))).reduce(_ + _) // if row_number is 1 then adding 1 else 0 ..
  11. conditions: org.apache.spark.sql.Column = (((CASE WHEN (NOT (count(att-a) OVER (PARTITION BY att-a ORDER BY att-a ASC NULLS FIRST unspecifiedframe$()) = 1)) THEN 1 ELSE 0 END + CASE WHEN (NOT (count(att-b) OVER (PARTITION BY att-b ORDER BY att-b ASC NULLS FIRST unspecifiedframe$()) = 1)) THEN 1 ELSE 0 END) + CASE WHEN (NOT (count(att-c) OVER (PARTITION BY att-c ORDER BY att-c ASC NULLS FIRST unspecifiedframe$()) = 1)) THEN 1 ELSE 0 END) + CASE WHEN (NOT (count(att-d) OVER (PARTITION BY att-d ORDER BY att-d ASC NULLS FIRST unspecifiedframe$()) = 1)) THEN 1 ELSE 0 END)

最终结果

  1. scala> df.withColumn("similarity_score",conditions).show(false)
  2. +-----+-----+-----+-----+----------+----------------+
  3. |att-a|att-b|att-c|att-d|customer |similarity_score|
  4. +-----+-----+-----+-----+----------+----------------+
  5. |9 |7 |12 |4 |customer-2|2 |
  6. |7 |3 |1 |10 |customer-3|3 |
  7. |7 |3 |10 |10 |customer-1|4 |
  8. |9 |14 |10 |4 |customer-4|3 |
  9. +-----+-----+-----+-----+----------+----------------+
展开查看全部

相关问题