任务不能在databricks上的scala中序列化

hgqdbh6s  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(420)

我正在尝试使用scala在databricks中实现一个udf功能。即使在将函数封装在类中并继承可序列化的类之后,也会出现获取任务不可序列化的错误。请参考以下代码:

var rkList = List[String]("")

class appendData extends Serializable{
  var cKey = ""

  def addKey(data:String):String={
    if(data=="")
    {
      return cKey
    }
    else
    {
      cKey=data
      return cKey
    }
  }

  def execute(dframe: DataFrame): DataFrame ={
    val keyAddUDF = udf[String, String](addKey)

    var df = dframe.withColumn("r_c",substring(col("val"),0,6))
    df = df.withColumn("r_k",when(col("r_c")===kHolder, substring(col("val"),pos,len)).otherwise(""))
    rkList = df.select(col("r_k")).distinct.collect.map(_(0).toString).toList.filter(_ != "")

    return df.withColumn("val",concat(col("val"),keyAddUDF(col("r_k")))).drop("r_k","r_c")
  }
}

df = (new appendData).execute(df)
amrnrhlw

amrnrhlw1#

你不应该两个都放 execute 方法和udf方法在同一类中。定义 addKey 单独运行,如:

def addKey(data:String): String = {
    var rkList = List[String]("")
    var cKey = ""

    if(data=="") {
      return cKey
    } else {
      cKey=data
      return cKey
    }
}

val keyAddUDF = udf[String, String](addKey)

def transformDf(dframe: DataFrame): DataFrame ={
    var df = dframe.withColumn("r_c",substring(col("val"),0,6))
    df = df.withColumn("r_k",when(col("r_c")===kHolder, substring(col("val"),pos,len)).otherwise(""))
    rkList = df.select(col("r_k")).distinct.collect.map(_(0).toString).toList.filter(_ != "")
    return df.withColumn("val",concat(col("val"),keyAddUDF(col("r_k")))).drop("r_k","r_c")
}  

df = transformDf(df)

相关问题