我有一个名为datadf的dataframe,我想重命名它的列。其他Dataframemapdf有“原始\u名称”->“代码\u名称”Map。我想将datadf的列名从“原始名称”更改为“代码名称”,因为mapdf具有这些值。我试图在循环中重新分配datadf,但是当数据量很大并且失去并行性时,会产生低性能。对于一个巨大的datadf数据集,可以用更好的方法来实现并行性和良好的性能吗?
import sparkSession.sqlContext.implicits._
var dataDF = Seq((10, 20, 30, 40, 50),(100, 200, 300, 400, 500),(10, 222, 333, 444, 555),(1123, 2123, 3123, 4123, 5123),(1321, 2321, 3321, 4321, 5321))
.toDF("col_1", "col_2", "col_3", "col_4", "col_5")
dataDF.show(false)
val mapDF = Seq(("col_1", "code_1", "true"),("col_3", "code_3", "true"),("col_4", "code_4", "true"),("col_5", "code_5", "true"))
.toDF("original_name", "code_name", "important")
mapDF.show(false)
val map_of_codename = mapDF.rdd.map(x => (x.getString(0), x.getString(1))).collectAsMap()
dataDF.columns.foreach(x => {
if (map_of_codename.contains(x))
dataDF = dataDF.withColumnRenamed(x, map_of_codename.get(x).get)
else
dataDF = dataDF.withColumnRenamed(x, "None")
}
)
dataDF.show(false)
========================
dataDF
+-----+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|col_5|
+-----+-----+-----+-----+-----+
|10 |20 |30 |40 |50 |
|100 |200 |300 |400 |500 |
|10 |222 |333 |444 |555 |
|1123 |2123 |3123 |4123 |5123 |
|1321 |2321 |3321 |4321 |5321 |
+-----+-----+-----+-----+-----+
mapDF
+-------------+---------+---------+
|original_name|code_name|important|
+-------------+---------+---------+
|col_1 |code_1 |true |
|col_3 |code_3 |true |
|col_4 |code_4 |true |
|col_5 |code_5 |true |
+-------------+---------+---------+
expected DF:
+------+----+------+------+------+
|code_1|None|code_3|code_4|code_5|
+------+----+------+------+------+
|10 |20 |30 |40 |50 |
|100 |200 |300 |400 |500 |
|10 |222 |333 |444 |555 |
|1123 |2123|3123 |4123 |5123 |
|1321 |2321|3321 |4321 |5321 |
+------+----+------+------+------+
2条答案
按热度按时间jei2mxaa1#
作为替代,您可以尝试使用别名,如下所示:
执行计划将由单个投影节点组成,例如,它可能有助于减少优化阶段:
也就是说,我不确定它是否能解决性能问题,因为在这两种情况下,您的
foreach
通过上述方案,可以将物理方案优化到单个节点CollapseProject
规则。仅供参考,
withColumnRenamed
在引擎盖下使用类似的方法,不同的是它对每个列分别执行:你对观察到的性能问题有什么意见吗?有哪些措施可以帮助确定操作需要时间?也许这不一定与列重命名有关?你以后对这些重命名的列做什么?
y1aodyip2#
一种方法是先获得不带spark的完整Map列列表,然后到for循环以重命名所有列,而不是call columns.foreach
下面是我的解决方案的一个例子(对不起,我不是scala方面的Maven,有些数据解析可能很难看)
在spark ui中,为每一列重命名它,使其成为一个stage,但是当我们看到dag可视化时,这些stage应该并行执行。