我有一个通用代码,给定一组键,一个dataframe将在dataframe中找到该键集的副本
不起作用的代码:
case class DuplicateRecord(
datasetName: String,
duplicateKeys: String,
duplicateCount: Long
)
def findDuplicatesInDF(
spark: SparkSession
inputName: String,
inputDataFrame: DataFrame,
groupColumns: Seq[String]): Dataset[DuplicateRecord] = {
import spark.implicits._
val keys = groupColumns.map(x => col(x))
val idToCounts = inputDataFrame
.groupBy(keys: _*)
.agg(count(keys(0)).as("duplicateKeyCount"))
idToCounts
.filter(col("duplicateKeyCount") > 1)
.map { idToCount =>
DuplicateRecord(
inputName,
groupColumns.map(x => idToCount.getAs(x).toString).mkString(","),
idToCount.getAs("duplicateKeyCount").toString.toLong)
}
}
上述代码在本地运行良好。然而,它在生产中失败了
Error initializing SparkContext.
org.apache.spark.SparkException: A master URL must be set in your configuration
at org.apache.spark.SparkContext.<init>(SparkContext.scala:375)
有效的代码:
case class DuplicateRecord(
datasetName: String,
duplicateKeys: String,
duplicateCount: Long
)
case class IdToCounts(
mergedKey: String,
duplicateKeyCount: Long
)
def findDuplicatesInDF(
spark: SparkSession,
inputName: String,
inputDataFrame: DataFrame,
groupColumns: Seq[String]): Dataset[DuplicateRecord] = {
import spark.implicits._
val keys = groupColumns.map(x => col(x))
val idToCounts = inputDataFrame
.withColumn("mergedKey", concat_ws(",", keys: _*))
.groupBy(col("mergedKey"))
.agg(count(col("mergedKey")).as("duplicateKeyCount"))
.as[IdToCounts]
idToCounts
.filter(idToCount => idToCount.duplicateKeyCount > 1)
.map { idToCount =>
DuplicateRecord(inputName, idToCount.mergedKey, idToCount.duplicateKeyCount)
}
}
我知道这与spark以本地模式在单个jvm示例上运行有关。但是由于有多个执行者,并且prod中的数据被分区,导致了一种不确定的行为,spark无法理解从何处提取数据来完成操作。但是,我想了解确切的问题,在现有的与此相关的堆栈溢出问题中还没有得到令人信服的答案。任何对此的洞察都会非常有用!谢谢!
暂无答案!
目前还没有任何答案,快来回答吧!