尝试在spark(scala)中收集sql查询结果时出错

okxuctiv  于 2021-07-12  发布在  Spark
关注(0)|答案(0)|浏览(215)

我遵循spark中的一个基本教程,其中要求我们将一个.csv文件导入数据库,执行查询并收集查询结果。
为此,我们首先创建一个persona case类来包含数据,并创建一个函数来将数据集的每一行格式化为persona对象,如下所示:

case class Persona(ID:Int,nombre:String,edad:Int,numeroAmigos:Int)

def procesarPersona(linea:String):Persona = {
    val campos = linea.split(",")
    val persona = Persona(campos(0).toInt,campos(1),campos(2).toInt,campos(3).toInt)
    return persona
}

然后,我们创建spark sql会话并从.csv文件导入数据,如下所示:

val spark = SparkSession.builder.appName("Personas").getOrCreate()

val lineas = spark.sparkContext.textFile("file:///home/Eduardo/Downloads/friends.csv")

然后我们用函数Map每一行以处理每一行:

val personas = lineas.map(procesarPersona) //DATAFRAME RESULTADO

接下来,我们使用以下命令将dataframe转换为sql数据库:

val estructuraDatos = personas.toDS

estructuraDatos.printSchema

estructuraDatos.createOrReplaceTempView("personas")

然后我执行sql查询,并尝试收集数据,

val mayoresEdad = spark.sql("SELECT * FROM personas WHERE edad >= 18")

val resultados = mayoresEdad.collect()

问题是,前面的所有步骤都与视频中显示的结果相匹配,但是,在执行查询之后,我无法收集结果而不产生以下错误。
我得到的错误如下:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 8, 192.168.1.43, executor driver): java.lang.ClassCastException: class Persona cannot be cast to class Persona (Persona is in unnamed module of loader org.apache.spark.repl.ExecutorClassLoader @5337f0dc; Persona is in unnamed module of loader scala.tools.nsc.interpreter.IMain$TranslatingClassLoader @2229c7c)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

我已经检查了我的spark版本,而且我是正确的,我可能不需要我的教授在mac上使用以前的scala版本,但我怀疑这是原因。
你知道错误是什么吗?
我使用的是spark 3.0.2、Ubuntu20.04和zeppelin 0.8.9-bin-all。
谢谢你的帮助!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题