如何在spark scalaDataframe中基于源列动态添加列

r8xiu3jd  于 2021-07-14  发布在  Spark
关注(0)|答案(2)|浏览(313)

我需要根据新列更新的df。但它没有更新新的列,它仍然给我旧列及其名称

val schema = "sku_cd#sku_code,ean_nbr#ean,vnr_cd#nan_key,dsupp_pcmdty_desc#pack_descr" 

val schemaArr = schema.split(",")

var df = spark.sql("""select sku_code, ean , nan_key, pack_descr from db.products""")

val updatedDF = populateAttributes(df,schemaArr)

 def populateAttributes(df:DataFrame,schemaArr:Array[String]) : DataFrame = {
 for(i <- schemaArr)
    {
          val targetCol = i.split("#")(0)
          val sourceCol = i.split("#")(1)
          df.withColumn(targetCol, col(sourceCol))
     }
      df
   }

我得到低于输出这是不正确的

scala> updatedDF.printSchema
 root
 |-- sku_code: string (nullable = true)
 |-- ean: string (nullable = true)
 |-- nan_key: string (nullable = true)
 |-- pack_descr: string (nullable = true)

预期产量

|-- sku_cd: string (nullable = true)
 |-- ean_nbr: string (nullable = true)
 |-- vnr_cd: string (nullable = true)
 |-- dsupp_pcmdty_desc: string (nullable = true)
nnt7mjpx

nnt7mjpx1#

您没有更新for循环中的Dataframe。线路:

df.withColumn(targetCol, col(sourceCol))

将创建一个新的Dataframe df 将保持不变。
你可以用 var 以便在每次迭代中重新分配原始Dataframe。同时使用 withColumnRenamed 要重命名列,请执行以下操作:

df = df.withColumnRenamed(sourceCol, targetCol)

或者更好,使用 foldLeft :

def populateAttributes(df:DataFrame,schemaArr:Array[String]) : DataFrame = {

 schemaArr.foldLeft(df)((acc, m) => {
     val mapping = m.split("#")
     acc.withColumnRenamed(mapping(1), mapping(0))
 })
}

使用选择表达式的另一种方法:

val selectExpr = schemaArr.map(m => {
  val mapping = m.split("#")
  col(mapping(1)).as(mapping(0))
})

val updatedDF = df.select(selectExpr:_*)
bjp0bcyl

bjp0bcyl2#

只是另一种方式去做Blackishop做的事

val schema = "sku_cd#sku_code,ean_nbr#ean,vnr_cd#nan_key,dsupp_pcmdty_desc#pack_descr" 

val schemaArr = schema.split(",").toSeq

val outputDF=schemaArr.foldLeft(inputDF)((df,x)=>df.withColumnRenamed(x,x.split('#')(0)))

相关问题