我使用scala/spark将数据插入到hiveparquet表中,如下所示
for(*lots of current_Period_Id*){//This loop is on a result of another query that returns multiple rows of current_Period_Id
val myDf = hiveContext.sql(s"""SELECT columns FROM MULTIPLE TABLES WHERE period_id=$current_Period_Id""")
val count: Int = myDf.count().toInt
if(count>0){
hiveContext.sql(s"""INSERT INTO destinationtable PARTITION(period_id=$current_Period_Id) SELECT columns FROM MULTIPLE TABLES WHERE period_id=$current_Period_Id""")
}
}
这种方法需要花费大量的时间来完成,因为select语句执行了两次。
我试图避免两次选择数据,我想到的一种方法是将dataframe mydf直接写入表中。
这就是我试图使用的代码的要点
val sparkConf = new SparkConf().setAppName("myApp")
.set("spark.yarn.executor.memoryOverhead","4096")
val sc = new SparkContext(sparkConf)
val hiveContext = new HiveContext(sc)
hiveContext.setConf("hive.exec.dynamic.partition","true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
for(*lots of current_Period_Id*){//This loop is on a result of another query
val myDf = hiveContext.sql("SELECT COLUMNS FROM MULTIPLE TABLES WHERE period_id=$current_Period_Id")
val count: Int = myDf.count().toInt
if(count>0){
myDf.write.mode("append").format("parquet").partitionBy("PERIOD_ID").saveAsTable("destinationtable")
}
}
但是我在mydf.write部分得到一个错误。
java.util.NoSuchElementException: key not found: period_id
目标表按时段id进行分区。
有人能帮我吗?
我使用的spark版本是1.5.0-cdh5.5.2。
1条答案
按热度按时间2vuwiymt1#
Dataframe架构和表的描述彼此不同。周期\u id!=period\u id列名在df中是大写的,但在表中是大写的。在sql中尝试使用小写句点\u id