Apache Spark 如何通过从另一个 Dataframe 获取值来更新 Dataframe 列?

k5ifujac  于 2022-11-16  发布在  Apache
关注(0)|答案(4)|浏览(138)

我有两个 Dataframe df_1df_2

rdd = spark.sparkContext.parallelize([
    (1, '', '5647-0394'),
    (2, '', '6748-9384'),
    (3, '', '9485-9484')])
df_1 = spark.createDataFrame(rdd, schema=['ID', 'UPDATED_MESSAGE', 'ZIP_CODE'])
# +---+---------------+---------+
# | ID|UPDATED_MESSAGE| ZIP_CODE|
# +---+---------------+---------+
# |  1|               |5647-0394|
# |  2|               |6748-9384|
# |  3|               |9485-9484|
# +---+---------------+---------+

rdd = spark.sparkContext.parallelize([
    ('JAMES', 'INDIA_WON', '6748-9384')])
df_2 = spark.createDataFrame(rdd, schema=['NAME', 'CODE', 'ADDRESS_CODE'])
# +-----+---------+------------+
# | NAME|     CODE|ADDRESS_CODE|
# +-----+---------+------------+
# |JAMES|INDIA_WON|   6748-9384|
# +-----+---------+------------+

我需要更新df_1列'UPDATED MESSAGE'的值'INDIA_WON'从df_2列'CODE'。目前列“UPDATED_MESSAGE”是空的。我需要更新每行的值为'INDIA_WON',我们如何在PySpark中做到这一点?这里的条件是如果我们在df_1列“ZIP_CODE”中找到'ADDRESS_CODE”值,我们需要填充'UPDATED_MESSAGE' = 'INDIA_WON'中的所有值。

f8rj6qna

f8rj6qna1#

我希望我已经很好地解释了你所需要的东西。如果是的话,那么你的逻辑看起来很奇怪。看起来,你的表很小。Spark是大数据的引擎(数百万到数十亿条记录)。如果你的表很小,考虑用Pandas做事情。

from pyspark.sql import functions as F

df_2 = df_2.groupBy('ADDRESS_CODE').agg(F.first('CODE').alias('CODE'))

df_joined = df_1.join(df_2, df_1.ZIP_CODE == df_2.ADDRESS_CODE, 'left')
df_filtered = df_joined.filter(~F.isnull('ADDRESS_CODE'))
if bool(df_filtered.head(1)):
    df_1 = df_1.withColumn('UPDATED_MESSAGE', F.lit(df_filtered.head()['CODE']))

df_1.show()
# +---+---------------+---------+
# | ID|UPDATED_MESSAGE| ZIP_CODE|
# +---+---------------+---------+
# |  1|      INDIA_WON|5647-0394|
# |  2|      INDIA_WON|6748-9384|
# |  3|      INDIA_WON|9485-9484|
# +---+---------------+---------+
kyvafyod

kyvafyod2#

下面的Python方法返回原始的df_1(当在df_2中没有找到ZIP_CODE匹配时)或修改后的df_1(其中UPDATED_MESSAGE列用df_2.CODE列中的值填充):

from pyspark.sql.functions import lit

def update_df1(df_1, df_2):
    if (df_1.join(df_2, on=(col("ZIP_CODE") == col("ADDRESS_CODE")), how="inner").count() == 0):
        return df_1
    code = df_2.collect()[0]["CODE"]
    return df_1.withColumn("UPDATED_MESSAGE", lit(code))
    
update_df1(df_1, df_2).show()

+---+---------------+---------+
| ID|UPDATED_MESSAGE| ZIP_CODE|
+---+---------------+---------+
|  1|      INDIA_WON|5647-0394|
|  2|      INDIA_WON|6748-9384|
|  3|      INDIA_WON|9485-9484|
+---+---------------+---------+
pwuypxnk

pwuypxnk3#

我建议在这种情况下使用广播连接,以避免过度混洗。
代码和逻辑如下

new=(df_1.drop('UPDATED_MESSAGE').join(broadcast(df_2.drop('NAME')),how='left', on=df_1.ZIP_CODE==df_2.ADDRESS_CODE)#Drop the null column and join
     .drop('ADDRESS_CODE')#Drop column no longer neede
     .toDF('ID', 'ZIP_CODE', 'UPDATED_MESSAGE')#rename new df
    ).show()
4jb9z9bj

4jb9z9bj4#

Spark SQL如此简单,为什么还要使用 Dataframe ?
将数据框转换为临时视图。

%python
df_1.createOrReplaceTempView("tmp_zipcodes")
df_2.createOrReplaceTempView("tmp_person")

编写简单的Spark SQL来获得答案。

%sql 
select 
  a.id, 
  case when b.code is null then '' else b.code end as update_message, 
  a.zip_code 
from tmp_zipcodes as a
left join tmp_person as b
on a.zip_code = b.address_code

查询的输出。如果需要写入磁盘,请使用spark.sql()创建一个 Dataframe 。

用新答案覆盖整个数据框。

sql_txt = """
  select 
    a.id, 
    case when b.code is null then '' else b.code end as update_message, 
    a.zip_code 
  from tmp_zipcodes as a
  left join tmp_person as b
  on a.zip_code = b.address_code
"""
df_1 = spark.sql(sql_txt)

相关问题