spark-load循环表中的Dataframe内容

4ngedf3f  于 2021-06-26  发布在  Hive
关注(0)|答案(1)|浏览(324)

我使用scala/spark将数据插入到hiveparquet表中,如下所示

for(*lots of current_Period_Id*){//This loop is on a result of another query that returns multiple rows of current_Period_Id
  val myDf = hiveContext.sql(s"""SELECT columns FROM MULTIPLE TABLES WHERE period_id=$current_Period_Id""")
  val count: Int = myDf.count().toInt
  if(count>0){
    hiveContext.sql(s"""INSERT INTO destinationtable PARTITION(period_id=$current_Period_Id) SELECT columns FROM MULTIPLE TABLES WHERE period_id=$current_Period_Id""")
  }
}

这种方法需要花费大量的时间来完成,因为select语句执行了两次。
我试图避免两次选择数据,我想到的一种方法是将dataframe mydf直接写入表中。
这就是我试图使用的代码的要点

val sparkConf = new SparkConf().setAppName("myApp")
                             .set("spark.yarn.executor.memoryOverhead","4096")
val sc = new SparkContext(sparkConf)
val hiveContext = new HiveContext(sc)

hiveContext.setConf("hive.exec.dynamic.partition","true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
for(*lots of current_Period_Id*){//This loop is on a result of another query
  val myDf = hiveContext.sql("SELECT COLUMNS FROM MULTIPLE TABLES WHERE period_id=$current_Period_Id")
  val count: Int = myDf.count().toInt
  if(count>0){
    myDf.write.mode("append").format("parquet").partitionBy("PERIOD_ID").saveAsTable("destinationtable")
  }
}

但是我在mydf.write部分得到一个错误。

java.util.NoSuchElementException: key not found: period_id

目标表按时段id进行分区。
有人能帮我吗?
我使用的spark版本是1.5.0-cdh5.5.2。

2vuwiymt

2vuwiymt1#

Dataframe架构和表的描述彼此不同。周期\u id!=period\u id列名在df中是大写的,但在表中是大写的。在sql中尝试使用小写句点\u id

相关问题