我正在使用spark dataframes并希望更新一个列 column_to_be_updated
在scala中使用sparksql的配置单元表中。
到目前为止,我的代码适用于较小的Dataframe:
var data_frame = spark.sql("Select ... From TableXX")
var id_list = spark.sql("Select Id From TableXY Where ...")..collect().map(_(0)).toList
data_frame.withColumn("column_to_be_updated", when($"other_column_of_frame".isin(id_list:_*), 1)
.otherwise($"column_to_be_updated"))
我想要的是更新这个专栏 column_to_be_updated
如果输入 other_column-of_frame
在的id列中 TableXY
. 我的解决方法是先将id列强制转换为列表,然后使用 .isin
-声明。
然而,我有很多排在 TableXY
以及 TableXX
所以它似乎崩溃了,超载了 id_list
. 对于我正在努力实现的目标,是否有其他解决方法或更有效的解决方案?
提前谢谢!
1条答案
按热度按时间weylhg0b1#
可以使用外部左连接来连接Dataframe。通过这种方式
Id
列可以添加到data_frame
在other_column_of_frame
在ID列表中。然后,只需检查新添加的Id
列是否为空。你可以读到
broadcast
这里:Dataframe连接优化-广播哈希连接