在第二个Dataframe中出现时过滤一个Dataframe皮斯帕克需要优雅的解决方案

cnh2zyt3  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(283)

我有一个问题。我们有两个输入Dataframe-upper\u in和lower\u in,我们需要在df中的lower\u出现时过滤df中的upper\u行。这意味着,如果从上\u in开始的行在下\u in,则该行将写入新的df(uppper\u out)。如果从上\u-in到下\u-in的行不在下\u-in,则它将被写入其他df(下\u-out)
我的直截了当的解决方案:
迭代df中的上\u<---问题
检查内容窗体上\u是否在中的下\u
真-写入上\u输出
false-写入以降低输出

upper_columns = upper_input.collect()
lower_columns = lower_input.collect()

# investigate map method

upper_output = spark.createDataFrame(spark.sparkContext.emptyRDD(), upper_input.schema)
lower_output = spark.createDataFrame(spark.sparkContext.emptyRDD(), lower_input.schema)

for upper_row, lower_row in zip(upper_columns, lower_columns):

# check if content is the same in upper and lower row

    if upper_row[selected_columns[0][0]] == lower_row[selected_columns[0][1]]:
         row_to_append = spark.createDataFrame([upper_row], upper_input.schema)
         upper_output = upper_output.union(row_to_append)
         row_to_append.unpersist()
     else:
         row_to_append = spark.createDataFrame([upper_row], lower_input.schema)
         lower_output = lower_output.union(row_to_append)
         row_to_append.unpersist()

但我希望smth更优雅,而且不使用.collect()函数。
理论上可以使用.map()或.foreach(),但我们需要为df创建类似smth的.pop()函数。
先谢谢你

n1bvdmb6

n1bvdmb61#

你可以用 intersectAll 以及 exceptAll 检查 upper_inlower_in :

upper_out = upper_in.intersectAll(lower_in)
lower_out = upper_in.exceptAll(lower_in)

或者,你可以做一个 left_semi 或者 left_anti 在所有列上联接:

upper_out = upper_in.join(lower_in, upper_in.columns, 'left_semi')
lower_out = upper_in.join(lower_in, upper_in.columns, 'left_anti')

如果要指定要检查的列列表,可以执行以下操作:

cols = ['Location']
upper_out = upper_in.join(lower_in, cols 'left_semi')
lower_out = upper_in.join(lower_in, cols, 'left_anti')

相关问题