无法读取并稍后查询apache spark中的文本文件

ua4mk5z4  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(423)

因此,我尝试使用我们提供的数据集来实现示例spark编程示例。它是一个用 | . 但是,即使在遵循给定的指令之后,它也会抛出以下错误。
我可以看到它无法将一个示例的对象“强制”到另一个示例中,任何关于如何处理该场景的建议。

Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
    at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
    at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
    at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

还有一个附属问题,什么是Parquet地板?
编辑:所以我仍然不确定哪里出了问题,现在我已经转到另一个项目。但我觉得我试图吸收的数据有些邪恶。请不要对这个问题投反对票。一旦我对问题有了更清晰的理解,我会接受下面的答案,或者自己回答问题(如果是这样的情况)。

gev0vcfq

gev0vcfq1#

要使用cast api,需要使用$“columnname”.cast()api对Dataframe内的列对象调用它
parquet是hadoop常用的文件格式。它是一种列数据存储格式。这意味着我们将列而不是行存储在一起。这将有助于后续的阅读,只需要我们阅读重要的列。因此,如果您有10列的表,并且您只想读取其中的2列,那么我们可以利用parquet(或orc)格式,只读取重要列,而跳过其他8列。

0wi1tuuw

0wi1tuuw2#

有更好的选项可用于读取分隔文件。你只需要额外的图书馆。
这方面有很好的文件。检查此链接。
在java中

Dataset<Row> people  =  spark.read()
                .format("com.databricks.spark.csv")
                .schema(customSchema)    
                .option("header", "true").option("delimiter", "|")
                .load("file.csv");

在斯卡拉

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") // Use first line of all files as header
    .schema(customSchema)
    .option("delimiter", "|")
    .load("file.csv")

相关问题