我正在尝试将csv作为自定义对象类型的Dataframe加载:
case class Geom(attributes: Map[String, Any])
我试过这个:
import session.implicits._
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
implicit val geomEncoder = org.apache.spark.sql.Encoders.product[Geom]
val sparkSQLGeometryRDD = session.read
.option("delimiter", "\t")
.option("inferSchema", "true")
.option("header", "true")
.csv("src\\main\\resources\\TexasPostCodes.txt")
//.as[MyObjEncoded]//(encoder)
.persist()
val columns = sparkSQLGeometryRDD.schema.fieldNames
//sparkSQLGeometryRDD.show()
val mappedDF = sparkSQLGeometryRDD
.map(x => x.getValuesMap[Any](columns.toList))
.map(x => Geom(x))
.show
但它抛出了一个例外:
Exception in thread "main" java.lang.ClassNotFoundException: scala.Any
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
有人能帮我找出,我的代码有什么问题吗?
在将case类和编码器从方法中移出之后,它工作得很好。
object SpatialEncoders {
implicit def MapEncoder: Encoder[Map[String, Any]]= Encoders.kryo[Map[String, Any]]
implicit def GeomEncoder: Encoder[Geom] = Encoders.kryo[Geom]
implicit def SparkSQLGeometryEncoder: Encoder[SparkSQLGeometry]= Encoders.kryo[SparkSQLGeometry]
}
暂无答案!
目前还没有任何答案,快来回答吧!