Apache Spark 联接 Dataframe 并以相同名称重命名结果列

2ul0zpep  于 2023-04-12  发布在  Apache
关注(0)|答案(4)|浏览(209)

简短的例子:

vals1 = [(1, "a"), 
        (2, "b"), 
      ]
columns1 = ["id","name"]
df1 = spark.createDataFrame(data=vals1, schema=columns1)

vals2 = [(1, "k"), 
      ]
columns2 = ["id","name"]
df2 = spark.createDataFrame(data=vals2, schema=columns2)

df1 = df1.alias('df1').join(df2.alias('df2'), 'id', 'full')
df1.show()

结果中有一个列名为id,两个列名为name。假设真实的的 Dataframe 有几十个这样的列,我如何重命名具有重复名称的列?

hlswsv35

hlswsv351#

只重命名相交列的另一种方法

from typing import List

from pyspark.sql import DataFrame

def join_intersect(df_left: DataFrame, df_right: DataFrame, join_cols: List[str], how: str = 'inner'):
    intersected_cols = set(df1.columns).intersection(set(df2.columns))
    cols_to_rename = [c for c in intersected_cols if c not in join_cols]

    for c in cols_to_rename:
        df_left = df_left.withColumnRenamed(c, f"{c}__1")
        df_right = df_right.withColumnRenamed(c, f"{c}__2")

    return df_left.join(df_right, on=join_cols, how=how)

vals1 = [(1, "a"), (2, "b")]
columns1 = ["id", "name"]
df1 = spark.createDataFrame(data=vals1, schema=columns1)
vals2 = [(1, "k")]
columns2 = ["id", "name"]
df2 = spark.createDataFrame(data=vals2, schema=columns2)

df_joined = join_intersect(df1, df2, ['name'])
df_joined.show()
t8e9dugd

t8e9dugd2#

可以在联接之前重命名重复的列,但联接所需的列除外:

import pyspark.sql.functions as F

def add_prefix(df, prefix, columns=None):
  if not columns:
    columns = df.columns
  return df.select(*[F.col(c).alias(prefix+c if c in columns else c) for c in df.columns])

def add_suffix(df, suffix, columns=None):
  if not columns:
    columns = df.columns
  return df.select(*[F.col(c).alias(c+suffix if c in columns else c) for c in df.columns])

join_cols = ['id']
columns_to_rename = [c for c in df1.columns if c in df2.columns and c not in join_cols]
df2 = add_suffix(df2, '_y', columns_to_rename)
df3 = df1.join(df2, *join_cols, 'full')
+---+----+------+
| id|name|name_y|
+---+----+------+
|  1|   a|     k|
|  2|   b|  null|
+---+----+------+
gzszwxb4

gzszwxb43#

@quaziqarta提出了一个在join之前重命名列的方法,注意你也可以在join之后重命名它们:

join_column = 'id'
df1 = df1.join(df2, join_column, 'full') \
         .select(
             [join_column] +
             [df1.alias('df1')['df1.'+c].alias(c+"_1") for c in df1.columns if c != join_column] + 
             [df2.alias('df2')['df2.'+c].alias(c+"_2") for c in df2.columns if c != join_column]
             ) \
         .show()

+---+------+------+
| id|name_1|name_2|
+---+------+------+
|  1|     a|     k|
|  2|     b|  null|
+---+------+------+

你只需要给dataframes起别名(就像你在例子中所做的那样),以便在请求Spark获取列“name”时能够指定你引用的列。

fae0ux8s

fae0ux8s4#

您可以只使用for循环来更改第二个dataframe中除join列之外的列的名称

vals1 = [(1, "a"),
         (2, "b"),
         ]
columns1 = ["id", "name"]
df1 = spark.createDataFrame(data=vals1, schema=columns1)

vals2 = [(1, "k"),
         ]
columns2 = ["id", "name"]
df2 = spark.createDataFrame(data=vals2, schema=columns2)
for i in df2.columns:
    if i != 'id':
        df2=df2.withColumnRenamed(i,i+'_1')
df1 = df1.alias('df1').join(df2.alias('df2'), 'id', 'full')
df1.show()

相关问题