在pyspark/spark中,从一个小的框架中删除存在于一个较大框架中的键

6pp0gazn  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(139)

我有两个篮子,

  1. df_A的大小为几兆字节,主键P
  2. df_B的大小为几千兆字节,也有该行P
    如何有效地从df_A中删除所有P存在于df_B中的记录?
    目前我使用朴素的spark sql语法来完成这一点,没有任何优化
  1. df_A.createOrReplaceTempView('df_A_table')
  2. df_B.createOrReplaceTempView('df_B_table')
  3. spark.sql("""
  4. select
  5. *
  6. from df_A_table
  7. where df_A_table.P not in (
  8. select P from df_B_table
  9. )
  10. """)

字符串
广播连接在这里有意义吗?如果有,它在这里有什么帮助(我只是猜测它可能工作,但不确定)

lc8prwob

lc8prwob1#

df_A中删除P存在于df_B中的所有记录可以通过在df_A和df_B之间执行left anti-join来完成。
下面是一个例子,其中表A和B可以在P上保持反连接:

  1. df_A:
  2. +---+---+
  3. | P| Q|
  4. +---+---+
  5. | 1| 10|
  6. | 2| 20|
  7. | 3| 30|
  8. | 4| 40|
  9. +---+---+
  10. df_B:
  11. +---+---+
  12. | P| S|
  13. +---+---+
  14. | 1|100|
  15. | 3|300|
  16. | 5|500|
  17. +---+---+

字符串
通常对于连接(或反连接),pyspark将需要对两个表的数据进行 Shuffle ,并将两个表中具有匹配P值的部分放入相同的节点中。但是, Shuffle 一个大表是昂贵的,因此广播连接将较大的表拆分到不同的节点中。(而不将其混洗),并且整个较小的表被“广播”或复制到每个节点,以与较大表的每个片段连接。
在你的例子中,由于df_B比df_A大得多,我们广播较小的框架将其发送到所有节点,以与较大的框架(在所有节点之间拆分)连接:

  1. df_A.join(
  2. F.broadcast(df_B),
  3. df_A.P == df_B.P,
  4. 'leftanti'
  5. ).select(
  6. df_A.P, df_A.Q
  7. )


测试结果:

  1. +---+---+
  2. | P| Q|
  3. +---+---+
  4. | 2| 20|
  5. | 4| 40|
  6. +---+---+

展开查看全部

相关问题