将Dataframe保存到配置单元时发生arrayindexoutofboundsexception

w8ntj3qf  于 2021-06-26  发布在  Hive
关注(0)|答案(2)|浏览(570)

我在使用以下api代码将Dataframe保存到配置单元表时遇到问题。

df.write.mode(SaveMode.Append).format("parquet").partitionBy("ord_deal_year", "ord_deal_month", "ord_deal_day").insertInto(tableName)

我的Dataframe大约有48列。其中配置单元表有90列。当我试图保存Dataframe时,收到以下错误:

12:56:11 Executor task launch worker-0 ERROR Executor:96  Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.ArrayIndexOutOfBoundsException: 51
    at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.genericGet(rows.scala:253)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getAs(rows.scala:34)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.isNullAt(rows.scala:35)
    at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.isNullAt(rows.scala:247)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:107)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:104)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:104)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
12:56:11 task-result-getter-3 WARN  TaskSetManager:71  Lost task 0.0 in stage 3.0 (TID 3, localhost): java.lang.ArrayIndexOutOfBoundsException: 51
    at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.genericGet(rows.scala:253)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getAs(rows.scala:34)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.isNullAt(rows.scala:35)
    at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.isNullAt(rows.scala:247)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:107)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:104)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:104)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

12:56:11 task-result-getter-3 ERROR TaskSetManager:75  Task 0 in stage 3.0 failed 1 times; aborting job

我尝试使用以下代码片段添加缺少的列。

val columnsAdded = columns.foldLeft(df) { case (d, c) =>
  if (d.columns.contains(c._1)) {
    // column exists; skip it
    d
  } else {
    // column is not available so add it
    d.withColumn(c._1, lit(null).cast(c._2))
  }
}

但同样的问题仍然存在。
我检查了以下问题:尝试从dataframe和解决方案将数据保存到配置单元表时出错,与配置单元表相比,dataframe中的解决方案被确定为不正确的架构。

newDF.schema.map{i =>
     s"Column ${i.name},${i.dataType}"+
     s" Column exists in hive ${hiveSchema.get(i.name).isDefined}" +
     s" Hive Table has the correct datatype ${i.dataType == hiveSchema(i.name)}"
}.foreach(i => println(i))

有没有人看到这个问题,或者对如何解决这个问题有什么建议?

ldfqzlk8

ldfqzlk81#

我会显式地选择您需要的所有附加列来填充缺少的属性。
另一件需要注意的事情是,您需要以正确的顺序获取列。spark可以编写符合模式的Parquet文件,但是它会忽略您使用的列名。因此,如果hive有a:string,b:string,并且您的spark代码生成了一个包含“b,a”的df,那么它将写得很好,但是列的顺序将是错误的。
因此,将这两个建议结合起来,我将添加一个guard子句,选择hive在元数据中的列的确切列表,按照它所期望的顺序—就在write out/insertinto之前。

pgpifvop

pgpifvop2#

使用时 insertInto ,您不必使用 partitionBy . 无论哪种方式,这些列都将用于配置单元中的分区。
顺便说一句, DataFrame 提供了一种很好地打印scheam开箱即用的方法 printSchema .

相关问题