我需要根据新列更新的df。但它没有更新新的列,它仍然给我旧列及其名称
val schema = "sku_cd#sku_code,ean_nbr#ean,vnr_cd#nan_key,dsupp_pcmdty_desc#pack_descr"
val schemaArr = schema.split(",")
var df = spark.sql("""select sku_code, ean , nan_key, pack_descr from db.products""")
val updatedDF = populateAttributes(df,schemaArr)
def populateAttributes(df:DataFrame,schemaArr:Array[String]) : DataFrame = {
for(i <- schemaArr)
{
val targetCol = i.split("#")(0)
val sourceCol = i.split("#")(1)
df.withColumn(targetCol, col(sourceCol))
}
df
}
我得到低于输出这是不正确的
scala> updatedDF.printSchema
root
|-- sku_code: string (nullable = true)
|-- ean: string (nullable = true)
|-- nan_key: string (nullable = true)
|-- pack_descr: string (nullable = true)
预期产量
|-- sku_cd: string (nullable = true)
|-- ean_nbr: string (nullable = true)
|-- vnr_cd: string (nullable = true)
|-- dsupp_pcmdty_desc: string (nullable = true)
2条答案
按热度按时间nnt7mjpx1#
您没有更新for循环中的Dataframe。线路:
将创建一个新的Dataframe
df
将保持不变。你可以用
var
以便在每次迭代中重新分配原始Dataframe。同时使用withColumnRenamed
要重命名列,请执行以下操作:或者更好,使用
foldLeft
:使用选择表达式的另一种方法:
bjp0bcyl2#
只是另一种方式去做Blackishop做的事