scala 当spark将json字符串转换为spark中的HashMap时,会发生异常

koaltpgm  于 2023-02-16  发布在  Scala
关注(0)|答案(1)|浏览(324)

在本地环境中没有问题,但是在执行spark提交时出现异常。
近似代码如下所示

class Test extends Serializable {
     def action() = {
         val sc = SparkContext.getOrCreate(sparkConf)
         val rdd1 = sc.textFile(.. )
         val rdd2 = rdd1.map ( logline => {
             //gson
             val jsonObject  jsonParser.parse(logLine).getAsJsonObject
             //jackson
             val jsonObject = objectMapper.readValue(logLine,classOf[HashMap[String,String]])
             MyDataSet ( parsedJson.get("field1"), parsedJson.get("field2"),...)              
         }                                  
     }
}

例外情况

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
               ensureSerializable(ClosureCleanser. scala:444)..
               ........
               ........
               caused by: java.io.NotSerializableException : com.fasterxml.jackson.module.scala.modifiers.ScalaTypeModifier

gson和Jackson库我都用过。
这难道不是一个可以通过从serializable继承来解决的问题吗?

oknwwptz

oknwwptz1#

异常NotSerializableException是非常不言自明的。你的任务是不可序列化的。Spark是一个并行计算引擎。驱动程序(执行主程序的地方)将您想要在RDD上进行的转换(在map函数中编写的代码)转换到执行它们的执行器。因此,这些转换需要是可序列化的。在您的情况下,jsonParserobjectMapper是在驱动程序上创建的,为了在转换中使用它们,spark尝试将它们序列化,但失败了,因为它们不可序列化,这是您的错误,我不知道哪个不可序列化,可能两个都不可序列化。
让我们举一个例子,看看我们可以如何修复它。

// let's create a non serializable class
class Stuff(val i : Int) {
    def get() = i
}
// we instantiate it in the driver
val stuff = new Stuff(4)

//this fails "Caused by: java.io.NotSerializableException: Stuff"
val result = sc.parallelize(Seq(1, 2,3)).map( x => (x, stuff.get)).collect

为了解决这个问题,我们在变形中创建一个对象

val result = sc.parallelize(Seq(1, 2,3))
    .map( x => {
        val new_stuff = new Stuff(4)
        (x, new_stuff.get)
    }).collect

这是可行的,但显然,为每条记录创建对象的开销会相当大,我们可以使用mapPartition做得更好,每个分区只创建一次对象:

val result = sc.parallelize(Seq(1, 2,3))
    .mapPartitions(part => {
         val new_stuff = new Stuff(4)
         part.map( x => (x, new_stuff.get))
    }).collect

相关问题