这不起作用,并抱怨序列化问题:
import java.nio.charset.StandardCharsets
import scala.util.Try
import java.net.URLDecoder
import spark.implicits._
val df = List("http%3A%2F%2Fforum.krasmama.ru%2Fviewforum.php%3Ff%3D247").toDF("URL")
def parseURL(s: String): String = {
Try(URLDecoder.decode(s, StandardCharsets.UTF_8.name())).toOption.getOrElse(null)
}
val parseURLudf = udf[String, String](parseURL)
val myCond = col("URL").startsWith("http")
val df2 = df.filter(myCond)
val dfWithParsedUrl = df2.withColumn("URL", parseURLudf(col("URL")))
dfWithParsedUrl.show(5, truncate=30)
但如果我消除了 myCond
变量和粘贴 col("URL").startsWith("http")
直接进入 filter
,它工作,但为什么?
错误日志如下:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:311)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2861)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2842)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2841)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2150)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2363)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:241)
at org.apache.spark.sql.Dataset.show(Dataset.scala:661)
... 53 elided
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: startswith(URL, http))
- field (class: $iw, name: myCond, type: class org.apache.spark.sql.Column)
- object (class $iw, $iw@720343cf)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(input[0, string, false]))
- element of array (index: 2)
- array (class [Ljava.lang.Object;, size 3)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:337)
... 82 more
顺便说一句,我不知道这是否可以在本地复制,因为在这种情况下,spark不一定序列化任何东西(如果它只在需要将代码从驱动程序发送到执行程序时序列化,对不起,我不知道它如何工作的细节)。
暂无答案!
目前还没有任何答案,快来回答吧!