如果我运行这个代码,为什么spark返回“task not serializable”?

xfb7svmp  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(434)

我已经在spark中编写了一个简单的代码。从dataframe列中获取文件位置并返回字符串,不管它是否存在。但一旦我运行这个,它将抛出一个“任务不可序列化”。有人能帮我摆脱这个错误吗?

object filetospark{
  def main(args: Array[String])  : Unit = {
    val spark = SparkSession
      .builder()
      .appName("app1")
      .master("local")
      .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
      val path: String => String = (Path: String) => {

        val exists = fs.exists(new Path(Path))
        var result = " "
        if (exists) {
          result = "Y"
        }
        else {
          result = "N"
        }
        result
      }
      val PATH = udf(path)

    val config_df=spark.read.
      option("header","true").
      option("inferSchema","true").
      csv("pathlocation")
    val current_date=LocalDate.now()
    val instance_table_df=instance_df.withColumn("is_available",PATH(col("file_name")))

像这样的错误

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:613)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
    at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2759)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:746)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:705)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:714)
    at filetospark$.main(filetospark.scala:40)
    at filetospark.main(filetospark.scala)
Caused by: java.io.NotSerializableException: org.apache.hadoop.fs.LocalFileSystem
Serialization stack:
    - object not serializable (class: org.apache.hadoop.fs.LocalFileSystem, value: org.apache.hadoop.fs.LocalFileSystem@7fd3fd06)
    - field (class: filetospark$$anonfun$1, name: fs$1, type: class org.apache.hadoop.fs.FileSystem)
    - object (class filetospark$$anonfun$1, <function1>)
    - element of array (index: 4)
    - array (class [Ljava.lang.Object;, size 5)
    - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, name: references$1, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, <function2>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
    ... 36 more

它表明这个错误有人可以解决这个问题

sg3maiej

sg3maiej1#

object filetospark{
  val spark = SparkSession
    .builder()
    .appName("app1")
    .master("local")
    .getOrCreate()
  spark.sparkContext.setLogLevel("ERROR")
  val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
  val path: String => String = (Path: String) => {
    val exists = fs.exists(new Path(Path))
    var result = " "
    if (exists) {
      result = "Y"
    }
    else {
      print("N")
      result = "N"
    }
    result
  }
  def main(args: Array[String])  : Unit = {
    val PATH = udf(path)
    val newfu=udf(newfun)
    val config_df=spark.read.
      option("header","true").
      option("inferSchema","true").
      csv("filepath")
    val current_date=LocalDate.now()
    val instance_table_df=instance_df.withColumn("is_available",PATH(col("file_name")))
    instance_table_df.show()

  }
}

我不知道这里发生了什么。现在那个错误被澄清了。但我的怀疑还在这里。我只是在主函数外创建了spark会话。它工作正常。但是我不知道这里发生了什么。如果有人知道,请在这里发布。

相关问题