spark:任务不可序列化奇怪错误

f87krz0w  于 2021-05-26  发布在  Spark
关注(0)|答案(0)|浏览(280)

这不起作用,并抱怨序列化问题:

import java.nio.charset.StandardCharsets
import scala.util.Try
import java.net.URLDecoder

import spark.implicits._

val df = List("http%3A%2F%2Fforum.krasmama.ru%2Fviewforum.php%3Ff%3D247").toDF("URL")

def parseURL(s: String): String = {
    Try(URLDecoder.decode(s, StandardCharsets.UTF_8.name())).toOption.getOrElse(null)
}

val parseURLudf = udf[String, String](parseURL)

val myCond = col("URL").startsWith("http")
val df2 = df.filter(myCond)
val dfWithParsedUrl = df2.withColumn("URL", parseURLudf(col("URL")))
dfWithParsedUrl.show(5, truncate=30)

但如果我消除了 myCond 变量和粘贴 col("URL").startsWith("http") 直接进入 filter ,它工作,但为什么?
错误日志如下:

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:311)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2861)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
  at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2842)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2841)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2150)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2363)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:241)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:661)
  ... 53 elided
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
    - object not serializable (class: org.apache.spark.sql.Column, value: startswith(URL, http))
    - field (class: $iw, name: myCond, type: class org.apache.spark.sql.Column)
    - object (class $iw, $iw@720343cf)
    - field (class: $anonfun$1, name: $outer, type: class $iw)
    - object (class $anonfun$1, <function1>)
    - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2, type: interface scala.Function1)
    - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, <function1>)
    - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
    - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(input[0, string, false]))
    - element of array (index: 2)
    - array (class [Ljava.lang.Object;, size 3)
    - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: references$1, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:337)
  ... 82 more

顺便说一句,我不知道这是否可以在本地复制,因为在这种情况下,spark不一定序列化任何东西(如果它只在需要将代码从驱动程序发送到执行程序时序列化,对不起,我不知道它如何工作的细节)。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题