pysparkDataframe

f4t66c6m  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(340)

我的情况如下。我有一个主Dataframedf1。我在for循环中进行处理以反映更改,我的伪代码如下所示。

for Year in [2019, 2020]:
  query_west = query_{Year}
  df_west = spark.sql(query_west)
  df_final = DF1.join(df_west, on['ID'], how='left')

在这种情况下,df\u final将加入查询并在每次迭代中更新,对吗?我希望在for循环中的每个迭代中,这些更改都反映在我的主Dataframedf1上。
请告诉我我的逻辑是否正确。谢谢。

m528fe3b

m528fe3b1#

正如@venky\的评论所建议的,您需要添加另一行 DF1 = df_final 在for循环的末尾,为了确保在每次迭代中更新df1。
另一种方法是使用 reduce 一次合并所有的连接。例如

from functools import reduce

dfs = [DF1]
for Year in [2019, 2020]:
  query_west = f'query_{Year}'
  df_west = spark.sql(query_west)
  dfs.append(df_west)

df_final = reduce(lambda x, y: x.join(y, 'ID', 'left'), dfs)

相当于

df_final = DF1.join(spark.sql('query_2019'), 'ID', 'left').join(spark.sql('query_2020'), 'ID', 'left')

相关问题