在spark中,如何在不重新分配的情况下重命名dataframe的列名?

p1tboqfb  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(489)

我有一个名为datadf的dataframe,我想重命名它的列。其他Dataframemapdf有“原始\u名称”->“代码\u名称”Map。我想将datadf的列名从“原始名称”更改为“代码名称”,因为mapdf具有这些值。我试图在循环中重新分配datadf,但是当数据量很大并且失去并行性时,会产生低性能。对于一个巨大的datadf数据集,可以用更好的方法来实现并行性和良好的性能吗?

import sparkSession.sqlContext.implicits._
    var dataDF = Seq((10, 20, 30, 40, 50),(100, 200, 300, 400, 500),(10, 222, 333, 444, 555),(1123, 2123, 3123, 4123, 5123),(1321, 2321, 3321, 4321, 5321))
      .toDF("col_1", "col_2", "col_3", "col_4", "col_5")
    dataDF.show(false)

    val mapDF = Seq(("col_1", "code_1", "true"),("col_3", "code_3", "true"),("col_4", "code_4", "true"),("col_5", "code_5", "true"))
      .toDF("original_name", "code_name", "important")
    mapDF.show(false)

    val map_of_codename = mapDF.rdd.map(x => (x.getString(0), x.getString(1))).collectAsMap()

    dataDF.columns.foreach(x => {
      if (map_of_codename.contains(x))
        dataDF = dataDF.withColumnRenamed(x, map_of_codename.get(x).get)
      else
        dataDF = dataDF.withColumnRenamed(x, "None")
    }
    )
    dataDF.show(false)

========================
dataDF
+-----+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|col_5|
+-----+-----+-----+-----+-----+
|10   |20   |30   |40   |50   |
|100  |200  |300  |400  |500  |
|10   |222  |333  |444  |555  |
|1123 |2123 |3123 |4123 |5123 |
|1321 |2321 |3321 |4321 |5321 |
+-----+-----+-----+-----+-----+

mapDF
+-------------+---------+---------+
|original_name|code_name|important|
+-------------+---------+---------+
|col_1        |code_1   |true     |
|col_3        |code_3   |true     |
|col_4        |code_4   |true     |
|col_5        |code_5   |true     |
+-------------+---------+---------+

expected DF:
+------+----+------+------+------+
|code_1|None|code_3|code_4|code_5|
+------+----+------+------+------+
|10    |20  |30    |40    |50    |
|100   |200 |300   |400   |500   |
|10    |222 |333   |444   |555   |
|1123  |2123|3123  |4123  |5123  |
|1321  |2321|3321  |4321  |5321  |
+------+----+------+------+------+
jei2mxaa

jei2mxaa1#

作为替代,您可以尝试使用别名,如下所示:

val aliases = dataDF.columns.map(columnName => $"${columnName}".as(map_of_codename.getOrElse(columnName, "None")))
dataDF.select(aliases: _*).show()

dataDF.select(aliases: _*).explain(true)

执行计划将由单个投影节点组成,例如,它可能有助于减少优化阶段:

== Analyzed Logical Plan ==
code_1: int, None: int, code_3: int, code_4: int, code_5: int
Project [col_1#16 AS code_1#77, col_2#17 AS None#78, col_3#18 AS code_3#79, col_4#19 AS code_4#80, col_5#20 AS code_5#81]
+- Project [_1#5 AS col_1#16, _2#6 AS col_2#17, _3#7 AS col_3#18, _4#8 AS col_4#19, _5#9 AS col_5#20]
   +- LocalRelation [_1#5, _2#6, _3#7, _4#8, _5#9]

也就是说,我不确定它是否能解决性能问题,因为在这两种情况下,您的 foreach 通过上述方案,可以将物理方案优化到单个节点 CollapseProject 规则。
仅供参考, withColumnRenamed 在引擎盖下使用类似的方法,不同的是它对每个列分别执行:

def withColumnRenamed(existingName: String, newName: String): DataFrame = {
    val resolver = sparkSession.sessionState.analyzer.resolver
    val output = queryExecution.analyzed.output
    val shouldRename = output.exists(f => resolver(f.name, existingName))
    if (shouldRename) {
      val columns = output.map { col =>
        if (resolver(col.name, existingName)) {
          Column(col).as(newName)
        } else {
          Column(col)
        }
      }
      select(columns : _*)
    } else {
      toDF()
    }
  }

你对观察到的性能问题有什么意见吗?有哪些措施可以帮助确定操作需要时间?也许这不一定与列重命名有关?你以后对这些重命名的列做什么?

y1aodyip

y1aodyip2#

一种方法是先获得不带spark的完整Map列列表,然后到for循环以重命名所有列,而不是call columns.foreach
下面是我的解决方案的一个例子(对不起,我不是scala方面的Maven,有些数据解析可能很难看)

var dataDF = Seq((10, 20, 30, 40, 50),(100, 200, 300, 400, 500),(10, 222, 333, 444, 555),(1123, 2123, 3123, 4123, 5123),(1321, 2321, 3321, 4321, 5321))
  .toDF("col_1", "col_2", "col_3", "col_4", "col_5")
dataDF.show(false)

val mapDF = Seq(("col_1", "code_1", "true"),("col_3", "code_3", "true"),("col_4", "code_4", "true"),("col_5", "code_5", "true"))
  .toDF("original_name", "code_name", "important")

val schema_mapping = mapDF.select("original_name", "code_name").collect()
//For mapping of None column (col 2)
val none_mapping = old_schema.map(x => if (!schema_mapping.map(x => x(0)).contains(x)) Array[String](x, "None")).filter(_ != ())

for(i <- 0 until schema_mapping.length){
    try {
        dataDF = dataDF.withColumnRenamed(schema_mapping(i)(0).toString, schema_mapping(i)(1).toString)
    }
    catch{
        case e : Throwable => println("cannot rename" +  schema_mapping(i)(0).toString + " to " + schema_mapping(i)(1).toString)
    }
}

for(i <- 0 until none_mapping.length){
    try {
        dataDF = dataDF.withColumnRenamed(none_mapping(i).asInstanceOf[Array[String]](0), none_mapping(i).asInstanceOf[Array[String]](1))
    }
    catch{
        case e : Throwable => println("cannot rename")
    }
}

dataDF.show(false)

在spark ui中,为每一列重命名它,使其成为一个stage,但是当我们看到dag可视化时,这些stage应该并行执行。

相关问题