scala—动态重命名dataframe中的列,然后与另一个表连接

5w9g7ksd  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(390)

在dataframe中有一个如下所示的属性表

在要重命名的列中,
我必须根据这个输入重命名这个列
如果cust\u id标志是yes,我只想加入customer表
在最后的输出中,我想用实际的列名显示散列值 val maintab_df = maintableval cust_df = customertable
在将主表列e重命名为a之后连接主表和客户表。 maintable.a = customertable.a

4uqofj5v

4uqofj5v1#

下面是一个如何操作的示例:

propertydf.show
+-----------------+------------+
|columns-to-rename|cust_id_flag|
+-----------------+------------+
|(e to a),(d to b)|           Y|
+-----------------+------------+

val columns_to_rename = propertydf.head(1)(0).getAs[String]("columns-to-rename")
val cust_id_flag = propertydf.head(1)(0).getAs[String]("cust_id_flag")

val parsed_columns = columns_to_rename.split(",")
    .map(c => c.replace("(", "").replace(")", "")
    .split(" to "))
// parsed_columns: Array[Array[String]] = Array(Array(e, a), Array(d, b))

val rename_columns = maintab_df.columns.map(c => {
    val matched = parsed_columns.filter(p => c == p(0))
    if (matched.size != 0)
        col(c).as(matched(0)(1).toString) 
    else 
        col(c)
})
// rename_columns: Array[org.apache.spark.sql.Column] = Array(e AS `a`, f, c, d AS `b`)

val select_columns = maintab_df.columns.map(c => {
    val matched = parsed_columns.filter(p => c == p(0))
    if (matched.size != 0) 
        col(matched(0)(1) + "_hash").as(matched(0)(1).toString) 
    else 
        col(c)
})
// select_columns: Array[org.apache.spark.sql.Column] = Array(a_hash AS `a`, f, c, b_hash AS `b`)

val join_cond = parsed_columns.map(_(1))
// join_cond: Array[String] = Array(a, b)

if (cust_id_flag == "Y") {
    val result = maintab_df.select(rename_columns:_*)
                           .join(cust_df, join_cond)
                           .select(select_columns:_*)
} else {
    val result = maintab_df
}

result.show
+------+---+---+--------+
|     a|  f|  c|       b|
+------+---+---+--------+
|*****!|  1| 11|    &&&&|
|****%|  2| 12|;;;;;;;;|
|*****@|  3| 13|  \\\\\\|
+------+---+---+--------+

相关问题