无法解析无法序列化的任务[org.apache.spark.sparkexception:task not serializable]spark scala rdd

xxe27gdn  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(565)

当我试图创建一个类的对象并调用特定的方法时,我不断得到以下错误堆栈跟踪 newRDD 以及 blah ```
I create a spark shell by importing the jar and run the following in spark-shell

spark-shell --master=yarn --jars=sample_jar.jar --files database.cfg

scala> val reader = new Sample(spark)
scala> val a = reader.buildFileRDD("/xyz/path")

org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.filter(RDD.scala:386)
at Sample.newRDDscala(Sample.scala:117)
... 48 elided
Caused by: java.io.NotSerializableException:

如何解决此错误?
blmhpbnm

blmhpbnm1#

从堆栈跟踪看来,您使用的是 DatabaseUtils 内部关闭,因为 DatabaseUtils 无法序列化无法通过n/w转换,请尝试序列化 DatabaseUtils . 而且,你可以
DatabaseUtils scala object ```
.. DatabaseUtils extends Serializable

vbopmzt1

vbopmzt12#

改变 DatabaseUtils 下面的代码&类内示例移除变量dbconfig&url,添加以下内容 val dbObj = new DatabaseUtils(ConfigFactory.parseFile(new File(config))) ```
class DatabaseUtils(url: String, username: String, password: String) {

val driver = "com.mysql.jdbc.Driver"

def executeSelectQuery(qry: String): List[String] = {

var dbString : ArrayBuffer[String] = ArrayBuffer.empty[String]
var conn:Connection = null
try {
  Class.forName(driver)
  conn = DriverManager.getConnection(url, username, password)
  val statement = conn.createStatement
  val rs = statement.executeQuery(qry)

  while (rs.next) dbString += rs.getString("db_string")

} catch {
  case e: Exception => e.printStackTrace
}
finally {
  conn.close()
}
dbString.toList

}
}

object DatabaseUtils {
def apply(dbConfig:Config): DatabaseUtils = {
val url = "jdbc:mysql://" + dbConfig.getString("db.host") +":"+ dbConfig.getString("db.port") + "/" + dbConfig.getString("db.database") + "?useSSL=false"
new DatabaseUtils(url, dbConfig.getString("db.username") ,dbConfig.getString("db.password"))
}
}

相关问题