pyspark基于csv的sparkDataframe列名

envsm3lx  于 2021-05-24  发布在  Spark
关注(0)|答案(3)|浏览(722)

我得到了以下Dataframe:

+--------+---------------+--------------------+---------+
|province|           city|      infection_case|confirmed|
+--------+---------------+--------------------+---------+
|   Seoul|     Yongsan-gu|       Itaewon Clubs|      139|
|   Seoul|      Gwanak-gu|             Richway|      119|
|   Seoul|        Guro-gu| Guro-gu Call Center|       95|
|   Seoul|   Yangcheon-gu|Yangcheon Table T...|       43|
|   Seoul|      Dobong-gu|     Day Care Center|       43|

现在我想根据csv文件更改列名(第一行),如下所示:

province,any_other__name
city,any_other__name      
infection_case,any_other__name
confirmed,any_other__name

这是我的密码:

cases = spark.read.load("/home/tool/Desktop/database/TEST/archive/Case.csv",format="csv", sep=",", inferSchema="true", header="true")
cases = cases.select('province','city','infection_case','confirmed')
cases \
  .write \
  .mode('overwrite') \
  .option('header', 'true') \
  .csv('8.csv')
voase2hg

voase2hg1#

这里的解决方案rename在pyspark中使用selectexpr()使用“as”关键字将列“old\u name”重命名为“new\u name”。

cases = cases.selectExpr("province as names1", "city as names2", "confirmed as names3")
nbewdwxp

nbewdwxp2#

最好的解决办法是使用 withColumnRenamed 方法。

for line in open("path/to/file.csv"):
    old_name, new_name = line.strip().split(",")
    cases = cases.withColumnRenamed(old_name, new_name)
xmd2e60i

xmd2e60i3#


# Define K,V pair in form of (old_name, new_name). Then

# By using withColumnRenamed update all required columns

schema = {
        'province':'any_province__name',
        'city':'any_city__name',     
        'infection_case':'any_infection_case__name',
        'confirmed':'any_confirmed__name' 
      }

def rename_column(df=None,schema=None):
    for columns in df.columns:
        df = df.withColumnRenamed(columns,schema[columns])
    return df

df_final = rename_column(df=df,schema=schema)

相关问题