我正在尝试使用spark修改配置单元表,比如根据sparkDataframe输出从配置单元表中添加列或删除列。下面是我试过的,一个很大的代码,
def main(args: Array[String]): Unit = {
implicit val spark = SparkSession.builder
.appName("SchemaHandle")
.enableHiveSupport
.getOrCreate
//Assume below is my generated DataFrame
import spark.implicits._
val dfSample = Seq(
(12, "Dallas", "Texas", 55, "BOOK S","hello","Hellotwo"),
(12, "SF", "CA", 25, "RULER","hello","Hellotwo"),
(13, "NYC", "NY", 53, "PENCIL S","hello","Hellotwo"),
(14, "Miami", "Fl", 45, "RULER","hello","Hellotwo"),
(12, "Houston", "Texas", 75, "MARKER","hello","Hellotwo"),
(11, "jersey", "NJ", 53, "WHITE NE R","hello","Hellotwo"),
(19, "new orleans", "LO", 45, "HIGHLIGHTNER","hello","Hellotwo")
).toDF("id", "city", "state", "qty", "item","columnone","columntwo")
try {
spark.sql("truncate table database.schematest")
println("Successfully truncated database.schematest")
} catch {
case _: Throwable => println("This Job is running for the very first time, so no table to truncate - We'll create the table below")
dfSample.write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(s"database.schematest")
println("Output Table Saved to database.schematest")
}
//Assume this is Spark DF Schema.
val seqone: Seq[StructField] = dfSample.schema
//Assume this is Existing Table Schema.
val seqtwo: Seq[StructField] = spark.table("database.schematest").schema
//Get Cols- with Schema to be Added
val diffedSeq = seqone diff seqtwo
//Get Cols- with Schema to be Dropped
val diffedSeqTwo = seqtwo diff seqone
//Get Cols- names to just make the diff
val seqonecolumns = dfSample.columns
//Get Cols- names to just make the diff
val seqtwocolumns = spark.table("dscoewrk_ing_qa.schematest").columns
val diffedSeqArrayOne = seqonecolumns diff seqtwocolumns
val diffedSeqArrayTwo = seqtwocolumns diff seqonecolumns
var fixedAlterColumns: String = ""
for (i <- diffedSeqArrayOne) {
for (j <- diffedSeq) {
if (i.equals(j.name)) {
fixedAlterColumns +=""+j.name +" "+ datatypeCheckFunction(j.dataType.toString)+","
}
}
}
if(fixedAlterColumns.length>0) {
println(s"Result---> ${fixedAlterColumns.substring(0, fixedAlterColumns.length - 1)}")
//Lets add new columns to table database.schematest.
spark.sql(s"ALTER TABLE database.schematest ADD COLUMNS (${fixedAlterColumns.substring(0, fixedAlterColumns.length - 1)})")
println("Alter Table Success")
}else{
println("No Columns to Add")
}
println("------------------------------BREAK---------------------------")
//Now lets think about dropping the columns
val dfSampleCurrentTable:Seq[StructField] = spark.table("dscoewrk_ing_qa.schematest").schema
//Since we cannot drop columns from Hive Table, lets do REPLACE COLUMNS.
val dfSampleFinalDiff = dfSampleCurrentTable diff diffedSeqTwo
dfSampleFinalDiff.foreach(println)
val dfSampleFinalDiffColArray = (spark.table("database.schematest").columns) diff diffedSeqArrayTwo
dfSampleFinalDiffColArray.foreach(println)
var fixedDropColumns:String = ""
for(i <- dfSampleFinalDiffColArray){
println("The i is"+i)
for(j <-dfSampleCurrentTable){
println("This is j"+j)
if(i.equals(j.name)){
fixedDropColumns+=""+j.name +" "+ datatypeCheckFunction(j.dataType.toString)+","
}
}
}
//Let's drop the columns that aren't required.
if(fixedDropColumns.length>0) {
println(s"Result---> ${fixedDropColumns.substring(0, fixedDropColumns.length - 1)}")
spark.sql(s"ALTER TABLE database.schematest REPLACE COLUMNS(${fixedDropColumns.substring(0,fixedDropColumns.length-1)})")
println("Alter Drop Table Success")
}else{
println("No Columns to Drop")
}
//Now let's save the DF to Output in the Table. By using Append as below.
dfSample.withColumn("mybool",functions.lit(null)).coalesce(50).write.format("parquet").mode(SaveMode.Append).insertInto("database.schematest")
println("Saving output Table Successful.")
}
def datatypeCheckFunction(datatypePassed: String): String = {
datatypePassed match {
case "BinaryType" | "ByteType" | "DateType" | "NullType" | "StringType" | "TimestampType" => "String"
case "BooleanType" => "boolean"
case "DoubleType" | "FloatType" => "Double"
case "IntegerType" | "ShortType" => "Int"
case "LongType" => "BigInt"
case _ => "String"
}
}
}
我可以理解有优化的空间,但至少对于这段代码,我看到了两个问题:1。当我运行上述spark作业时,add columns成功,replace columns失败,原因如下:
Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException:
Operation not allowed: ALTER TABLE REPLACE COLUMNS(line 1, pos 0)
== SQL ==
ALTER TABLE database.schematest REPLACE COLUMNS(id Int,city String,state String,qty Int,item String,columnone String,columntwo String)
假设replace columns已经起作用,它是否也会删除该删除列的数据?
下面是我使用的配置单元创建表语句:
create table schematest(`id` int, `city` string, `state` string, `qty` int, `mybool` boolean) stored as parquet
如有任何帮助,我们将不胜感激。
1条答案
按热度按时间qltillow1#
我刚刚在Hive手册中看到这一段:
替换列删除所有现有列并添加新的列集。只能对具有本机serde(dynamicserde、metadatatypedcolumnsetserde、lazysimpleserde和columnarserde)的表执行此操作
据我所知,似乎不支持Parquet地板。