我在使用以下api代码将Dataframe保存到配置单元表时遇到问题。
df.write.mode(SaveMode.Append).format("parquet").partitionBy("ord_deal_year", "ord_deal_month", "ord_deal_day").insertInto(tableName)
我的Dataframe大约有48列。其中配置单元表有90列。当我试图保存Dataframe时,收到以下错误:
12:56:11 Executor task launch worker-0 ERROR Executor:96 Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.ArrayIndexOutOfBoundsException: 51
at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.genericGet(rows.scala:253)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getAs(rows.scala:34)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.isNullAt(rows.scala:35)
at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.isNullAt(rows.scala:247)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:107)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:104)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:104)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
12:56:11 task-result-getter-3 WARN TaskSetManager:71 Lost task 0.0 in stage 3.0 (TID 3, localhost): java.lang.ArrayIndexOutOfBoundsException: 51
at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.genericGet(rows.scala:253)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getAs(rows.scala:34)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.isNullAt(rows.scala:35)
at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.isNullAt(rows.scala:247)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:107)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:104)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:104)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
12:56:11 task-result-getter-3 ERROR TaskSetManager:75 Task 0 in stage 3.0 failed 1 times; aborting job
我尝试使用以下代码片段添加缺少的列。
val columnsAdded = columns.foldLeft(df) { case (d, c) =>
if (d.columns.contains(c._1)) {
// column exists; skip it
d
} else {
// column is not available so add it
d.withColumn(c._1, lit(null).cast(c._2))
}
}
但同样的问题仍然存在。
我检查了以下问题:尝试从dataframe和解决方案将数据保存到配置单元表时出错,与配置单元表相比,dataframe中的解决方案被确定为不正确的架构。
newDF.schema.map{i =>
s"Column ${i.name},${i.dataType}"+
s" Column exists in hive ${hiveSchema.get(i.name).isDefined}" +
s" Hive Table has the correct datatype ${i.dataType == hiveSchema(i.name)}"
}.foreach(i => println(i))
有没有人看到这个问题,或者对如何解决这个问题有什么建议?
2条答案
按热度按时间ldfqzlk81#
我会显式地选择您需要的所有附加列来填充缺少的属性。
另一件需要注意的事情是,您需要以正确的顺序获取列。spark可以编写符合模式的Parquet文件,但是它会忽略您使用的列名。因此,如果hive有a:string,b:string,并且您的spark代码生成了一个包含“b,a”的df,那么它将写得很好,但是列的顺序将是错误的。
因此,将这两个建议结合起来,我将添加一个guard子句,选择hive在元数据中的列的确切列表,按照它所期望的顺序—就在write out/insertinto之前。
pgpifvop2#
使用时
insertInto
,您不必使用partitionBy
. 无论哪种方式,这些列都将用于配置单元中的分区。顺便说一句,
DataFrame
提供了一种很好地打印scheam开箱即用的方法printSchema
.