scala 基于Map类型键值对更新Spark Dataframe 列名

k4emjkb1  于 2023-08-05  发布在  Scala
关注(0)|答案(2)|浏览(130)

我有一个spark dataframe df。我需要根据Map类型的键值对来更新Spark Dataframe 的列名。

df.show()

   | col1|col2 |col3|
   |  2  |  Ive|1989|
   |Tom  | null|1981|
   |  4  | John|1991|

 Map_value = (col1 -> id, col2 -> name, col3 -> year)

字符串
需要帮助。我不知道该怎么办
预期的输出:

| id  | name|year|
   |  2  |  Ive|1989|
   |Tom  | null|1981|
   |  4  | John|1991|

sycxhyv7

sycxhyv71#

鉴于:

case class ColData(col1: String, col2: String, col3: Int)

字符串
在顶层定义:

val sourceSeq = Seq(
      ColData("2", "Ive", 1989),
      ColData("Tom", null, 1981),
      ColData("4", "John", 1991),
    )

    import sparkSession.implicits._

    def mapFields[T](ds: Dataset[T], fieldNameMap: Map[String, String]): DataFrame = {
      // make sure the fields are present - note this is not a free operation
      val fieldNames = ds.schema.fieldNames.toSet
      val newNames = fieldNameMap.filterKeys(fieldNames).map{ 
        case (oldFieldName, newFieldName) => col(oldFieldName).as(newFieldName)
      }.toSeq
      
      ds.select(newNames: _*)
    }

    val newNames = mapFields(sourceSeq.toDS(), Map("col1" -> "id", "col2" -> "name", "col3" -> "year", "not a field" -> "field"))

    newNames.show()


产生:

+---+----+----+
| id|name|year|
+---+----+----+
|  2| Ive|1989|
|Tom|null|1981|
|  4|John|1991|
+---+----+----+


注意事项:
fieldNames检查使用ds.schema,这可能非常昂贵,因此更喜欢使用已知字段而不是. schema。在大量字段上使用withColumn或withColumn重命名可能会严重影响性能,因为在生成的代码中并不是所有的投影都被删除了,所以希望尽可能保持投影的数量较低。

0pizxfdo

0pizxfdo2#

您可以使用withColumnRenamed来重命名列。
所以使用伪代码,代码应该是:

map_value.foreach((k,v) ->  df = df.withcolumnrenamed(k,v))

字符串
对于map中的每个key/value,在dataframe中,将列key重命名为新名称value

相关问题