我有两个 Dataframe df_1
和df_2
:
rdd = spark.sparkContext.parallelize([
(1, '', '5647-0394'),
(2, '', '6748-9384'),
(3, '', '9485-9484')])
df_1 = spark.createDataFrame(rdd, schema=['ID', 'UPDATED_MESSAGE', 'ZIP_CODE'])
# +---+---------------+---------+
# | ID|UPDATED_MESSAGE| ZIP_CODE|
# +---+---------------+---------+
# | 1| |5647-0394|
# | 2| |6748-9384|
# | 3| |9485-9484|
# +---+---------------+---------+
rdd = spark.sparkContext.parallelize([
('JAMES', 'INDIA_WON', '6748-9384')])
df_2 = spark.createDataFrame(rdd, schema=['NAME', 'CODE', 'ADDRESS_CODE'])
# +-----+---------+------------+
# | NAME| CODE|ADDRESS_CODE|
# +-----+---------+------------+
# |JAMES|INDIA_WON| 6748-9384|
# +-----+---------+------------+
我需要更新df_1
列'UPDATED MESSAGE'的值'INDIA_WON'从df_2列'CODE'。目前列“UPDATED_MESSAGE”是空的。我需要更新每行的值为'INDIA_WON',我们如何在PySpark中做到这一点?这里的条件是如果我们在df_1
列“ZIP_CODE”中找到'ADDRESS_CODE”值,我们需要填充'UPDATED_MESSAGE' = 'INDIA_WON'中的所有值。
4条答案
按热度按时间f8rj6qna1#
我希望我已经很好地解释了你所需要的东西。如果是的话,那么你的逻辑看起来很奇怪。看起来,你的表很小。Spark是大数据的引擎(数百万到数十亿条记录)。如果你的表很小,考虑用Pandas做事情。
kyvafyod2#
下面的Python方法返回原始的
df_1
(当在df_2
中没有找到ZIP_CODE
匹配时)或修改后的df_1
(其中UPDATED_MESSAGE
列用df_2.CODE
列中的值填充):pwuypxnk3#
我建议在这种情况下使用广播连接,以避免过度混洗。
代码和逻辑如下
4jb9z9bj4#
Spark SQL如此简单,为什么还要使用 Dataframe ?
将数据框转换为临时视图。
编写简单的Spark SQL来获得答案。
查询的输出。如果需要写入磁盘,请使用spark.sql()创建一个 Dataframe 。
用新答案覆盖整个数据框。