spark sql物理计划不重用exchange

rwqw0loc  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(488)

我正在尝试为下面的转换优化物理计划。
从“pad”和“pi”读取数据
在“pad”中查找在“pi”中有引用的行,并转换某些列。
在“pad”中查找在“pi”中没有引用的行,并转换某些列。
合并第2行和第3行。

  1. val pad_in_pi = pad
  2. .join(
  3. pi
  4. , $"pad.ReferenceKeyCode" === $"pi.PurchaseInvoiceKeyCode"
  5. , "inner"
  6. )
  7. .selectExpr(
  8. "pad.AccountingDocumentKeyCode"
  9. , "pad.RegionId"
  10. , "pi.PurchaseInvoiceLineNumber as DocumentLineNumber"
  11. , "pi.CodingBlockSequentialNumber"
  12. )
  13. val pad_not_in_pi = pad
  14. .join(
  15. pi
  16. , $"pad.ReferenceKeyCode" === $"pi.PurchaseInvoiceKeyCode"
  17. , "anti"
  18. )
  19. .selectExpr(
  20. "pad.AccountingDocumentKeyCode"
  21. , "pad.RegionId"
  22. , "pad.AccountingDocumentLineNumber as DocumentLineNumber"
  23. , "0001 as CodingBlockSequentialNumber"
  24. )
  25. pad_in_pi.union(pad_not_in_pi)

分支2和分支3使用相同的联接表达式,因此可以重用交换。目前的实际计划没有。原因是什么?

  1. == Physical Plan ==
  2. Union
  3. :- *(3) Project [AccountingDocumentKeyCode#491, RegionId#539, PurchaseInvoiceLineNumber#205 AS DocumentLineNumber#954, CodingBlockSequentialNumber#203]
  4. : +- *(3) SortMergeJoin [ReferenceKeyCode#538], [PurchaseInvoiceKeyCode#235], Inner
  5. : :- Sort [ReferenceKeyCode#538 ASC NULLS FIRST], false, 0
  6. : : +- Exchange hashpartitioning(ReferenceKeyCode#538, 200), true, [id=#684]
  7. : : +- *(1) Project [AccountingDocumentKeyCode#491, ReferenceKeyCode#538, RegionId#539]
  8. : : +- *(1) Filter ((isnotnull(RegionId#539) AND (RegionId#539 = R)) AND isnotnull(ReferenceKeyCode#538))
  9. : : +- *(1) ColumnarToRow
  10. : : +- FileScan parquet default.purchaseaccountingdocument_delta[AccountingDocumentKeyCode#491,ReferenceKeyCode#538,RegionId#539] Batched: true, DataFilters: [isnotnull(RegionId#539), (RegionId#539 = R), isnotnull(ReferenceKeyCode#538)], Format: Parquet, Location: PreparedDeltaFileIndex[dbfs:..., PartitionFilters: [], PushedFilters: [IsNotNull(RegionId), EqualTo(RegionId,R), IsNotNull(ReferenceKeyCode)], ReadSchema: struct<AccountingDocumentKeyCode:string,ReferenceKeyCode:string,RegionId:string>
  11. : +- Sort [PurchaseInvoiceKeyCode#235 ASC NULLS FIRST], false, 0
  12. : +- Exchange hashpartitioning(PurchaseInvoiceKeyCode#235, 200), true, [id=#692]
  13. : +- *(2) Project [CodingBlockSequentialNumber#203, PurchaseInvoiceLineNumber#205, PurchaseInvoiceKeyCode#235]
  14. : +- *(2) Filter ((isnotnull(RegionId#207) AND (RegionId#207 = R)) AND isnotnull(PurchaseInvoiceKeyCode#235))
  15. : +- *(2) ColumnarToRow
  16. : +- FileScan parquet default.purchaseinvoice_delta[CodingBlockSequentialNumber#203,PurchaseInvoiceLineNumber#205,RegionID#207,PurchaseInvoiceKeyCode#235] Batched: true, DataFilters: [isnotnull(RegionID#207), (RegionID#207 = R), isnotnull(PurchaseInvoiceKeyCode#235)], Format: Parquet, Location: PreparedDeltaFileIndex[dbfs:..., PartitionFilters: [], PushedFilters: [IsNotNull(RegionID), EqualTo(RegionID,R), IsNotNull(PurchaseInvoiceKeyCode)], ReadSchema: struct<CodingBlockSequentialNumber:string,PurchaseInvoiceLineNumber:string,RegionID:string,Purcha...
  17. +- *(6) Project [AccountingDocumentKeyCode#491, RegionId#539, AccountingDocumentLineNumber#492 AS DocumentLineNumber#1208, 1 AS CodingBlockSequentialNumber#1210]
  18. +- SortMergeJoin [ReferenceKeyCode#538], [PurchaseInvoiceKeyCode#235], LeftAnti
  19. :- Sort [ReferenceKeyCode#538 ASC NULLS FIRST], false, 0
  20. : +- Exchange hashpartitioning(ReferenceKeyCode#538, 200), true, [id=#703]
  21. : +- *(4) Project [AccountingDocumentKeyCode#491, AccountingDocumentLineNumber#492, ReferenceKeyCode#538, RegionId#539]
  22. : +- *(4) Filter (isnotnull(RegionId#539) AND (RegionId#539 = R))
  23. : +- *(4) ColumnarToRow
  24. : +- FileScan parquet default.purchaseaccountingdocument_delta[AccountingDocumentKeyCode#491,AccountingDocumentLineNumber#492,ReferenceKeyCode#538,RegionId#539] Batched: true, DataFilters: [isnotnull(RegionId#539), (RegionId#539 = R)], Format: Parquet, Location: PreparedDeltaFileIndex[dbfs:..., PartitionFilters: [], PushedFilters: [IsNotNull(RegionId), EqualTo(RegionId,R)], ReadSchema: struct<AccountingDocumentKeyCode:string,AccountingDocumentLineNumber:string,ReferenceKeyCode:stri...
  25. +- Sort [PurchaseInvoiceKeyCode#235 ASC NULLS FIRST], false, 0
  26. +- Exchange hashpartitioning(PurchaseInvoiceKeyCode#235, 200), true, [id=#710]
  27. +- *(5) Project [PurchaseInvoiceKeyCode#235]
  28. +- *(5) Filter ((isnotnull(RegionId#207) AND (RegionId#207 = R)) AND isnotnull(PurchaseInvoiceKeyCode#235))
  29. +- *(5) ColumnarToRow
  30. +- FileScan parquet default.purchaseinvoice_delta[RegionID#207,PurchaseInvoiceKeyCode#235] Batched: true, DataFilters: [isnotnull(RegionID#207), (RegionID#207 = R), isnotnull(PurchaseInvoiceKeyCode#235)], Format: Parquet, Location: PreparedDeltaFileIndex[dbfs:..., PartitionFilters: [], PushedFilters: [IsNotNull(RegionID), EqualTo(RegionID,R), IsNotNull(PurchaseInvoiceKeyCode)], ReadSchema: struct<RegionID:string,PurchaseInvoiceKeyCode:string>
rpppsulh

rpppsulh1#

不直接回答有关exchange重用的问题,但请尝试使用左外联接来除去联合:

  1. pad.join(
  2. pi
  3. , $"pad.ReferenceKeyCode" === $"pi.PurchaseInvoiceKeyCode"
  4. , "left_outer"
  5. )
  6. .selectExpr(
  7. "pad.AccountingDocumentKeyCode"
  8. , "pad.RegionId"
  9. , "coalesce(pi.PurchaseInvoiceLineNumber, pad.AccountingDocumentLineNumber) as DocumentLineNumber"
  10. , "coalesce(pi.CodingBlockSequentialNumber, '0001') as CodingBlockSequentialNumber"
  11. )

相关问题