如何根据scala中有许多条目的其他Dataframe的列更新sparkDataframe?

cngwdvgl  于 2021-06-24  发布在  Hive
关注(0)|答案(1)|浏览(240)

我正在使用spark dataframes并希望更新一个列 column_to_be_updated 在scala中使用sparksql的配置单元表中。
到目前为止,我的代码适用于较小的Dataframe:

var data_frame = spark.sql("Select ... From TableXX")

var id_list = spark.sql("Select Id From TableXY Where ...")..collect().map(_(0)).toList

data_frame.withColumn("column_to_be_updated", when($"other_column_of_frame".isin(id_list:_*), 1)
    .otherwise($"column_to_be_updated"))

我想要的是更新这个专栏 column_to_be_updated 如果输入 other_column-of_frame 在的id列中 TableXY . 我的解决方法是先将id列强制转换为列表,然后使用 .isin -声明。
然而,我有很多排在 TableXY 以及 TableXX 所以它似乎崩溃了,超载了 id_list . 对于我正在努力实现的目标,是否有其他解决方法或更有效的解决方案?
提前谢谢!

weylhg0b

weylhg0b1#

可以使用外部左连接来连接Dataframe。通过这种方式 Id 列可以添加到 data_frameother_column_of_frame 在ID列表中。然后,只需检查新添加的 Id 列是否为空。

val ids = spark.sql("Select Id From TableXY Where ...")
val updated = data_frame
  .join(broadcast(ids), ids.col("Id") === data_frame.col("other_column_of_frame"), "left_outer")
  .withColumn("column_to_be_updated", when($"Id".isNotNull, 1).otherwise($"column_to_be_updated"))
  .drop("Id")

你可以读到 broadcast 这里:Dataframe连接优化-广播哈希连接

相关问题