scala:从csv中读取列为空值的数据

jdg4fx2g  于 2021-07-09  发布在  Spark
关注(0)|答案(2)|浏览(574)

环境-spark-3.0.1-bin-hadoop2.7、scalalibrarycontainer 2.12.3、scala、sparksql、eclipse-jee-oxygen-2-linux-gtk-x8664
我有一个csv文件,有3列数据类型:string,long,date。我已经将csv文件转换为datafram并想显示它。但它给出了以下错误

  1. java.lang.ArrayIndexOutOfBoundsException: 2
  2. at org.apache.spark.examples.sql.SparkSQLExample5$.$anonfun$runInferSchemaExample$2(SparkSQLExample5.scala:30)
  3. at scala.collection.Iterator$$anon$10.next(Iterator.scala:448)
  4. at scala.collection.Iterator$$anon$10.next(Iterator.scala:448)
  5. at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  6. at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  7. at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
  8. at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
  9. at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
  10. at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
  11. at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  12. at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
  13. at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
  14. at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  15. at org.apache.spark.scheduler.Task.run(Task.scala:127)
  16. at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
  17. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
  18. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
  19. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  20. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

在scala代码

  1. .map(attributes => Person(attributes(0), attributes(1),attributes(2))).toDF();

如果后续行的值少于标头中的值数,则会出现错误。基本上,我正在尝试使用scala和spark从csv读取数据,其中列有空值。
行的列数不同。如果所有行都有3个列值,则它将成功运行。

  1. package org.apache.spark.examples.sql
  2. import org.apache.spark.sql.Row
  3. import org.apache.spark.sql.SparkSession
  4. import org.apache.spark.sql.types._
  5. import java.sql.Date
  6. import org.apache.spark.sql.functions._
  7. import java.util.Calendar;
  8. object SparkSQLExample5 {
  9. case class Person(name: String, age: String, birthDate: String)
  10. def main(args: Array[String]): Unit = {
  11. val fromDateTime=java.time.LocalDateTime.now;
  12. val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.master", "local").getOrCreate();
  13. import spark.implicits._
  14. runInferSchemaExample(spark);
  15. spark.stop()
  16. }
  17. private def runInferSchemaExample(spark: SparkSession): Unit = {
  18. import spark.implicits._
  19. println("1. Creating an RDD of 'Person' object and converting into 'Dataframe' "+
  20. " 2. Registering the DataFrame as a temporary view.")
  21. println("1. Third column of second row is not present.Last value of second row is comma.")
  22. val peopleDF = spark.sparkContext
  23. .textFile("examples/src/main/resources/test.csv")
  24. .map(_.split(","))
  25. .map(attributes => Person(attributes(0), attributes(1),attributes(2))).toDF();
  26. val finalOutput=peopleDF.select("name","age","birthDate")
  27. finalOutput.show();
  28. }

}
csv文件

  1. col1,col2,col3
  2. row21,row22,
  3. row31,row32,
2hh7jdfx

2hh7jdfx1#

输入:csv文件

  1. col1,col2,col3
  2. row21,row22,
  3. row31,row32,

代码:

  1. import org.apache.spark.sql.SparkSession
  2. object ReadCsvFile {
  3. case class Person(name: String, age: String, birthDate: String)
  4. def main(args: Array[String]): Unit = {
  5. val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.master", "local").getOrCreate();
  6. readCsvFileAndInferCustomSchema(spark);
  7. spark.stop()
  8. }
  9. private def readCsvFileAndInferCustomSchema(spark: SparkSession): Unit = {
  10. val df = spark.read.csv("C:/Users/Ralimili/Desktop/data.csv")
  11. val rdd = df.rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }
  12. val mapRdd = rdd.map(attributes => {
  13. Person(attributes.getString(0), attributes.getString(1),attributes.getString(2))
  14. })
  15. val finalDf = spark.createDataFrame(mapRdd)
  16. finalDf.show(false);
  17. }
  18. }

输出

  1. +-----+-----+---------+
  2. |name |age |birthDate|
  3. +-----+-----+---------+
  4. |row21|row22|null |
  5. |row31|row32|null |
  6. +-----+-----+---------+

如果要填充某些值而不是空值,请使用下面的代码

  1. val customizedNullDf = finalDf.na.fill("No data")
  2. customizedNullDf.show(false);

输出

  1. +-----+-----+---------+
  2. |name |age |birthDate|
  3. +-----+-----+---------+
  4. |row21|row22|No data |
  5. |row31|row32|No data |
  6. +-----+-----+---------+
展开查看全部
f87krz0w

f87krz0w2#

在读取csv文件时尝试许可模式,它将为缺少的字段添加null val df = spark.sqlContext.read.format("csv").option("mode", "PERMISSIVE") .load("examples/src/main/resources/test.csv") 你可以找到更多的信息https://docs.databricks.com/data/data-sources/read-csv.html

相关问题