我正在尝试为下面的转换优化物理计划。
从“pad”和“pi”读取数据
在“pad”中查找在“pi”中有引用的行,并转换某些列。
在“pad”中查找在“pi”中没有引用的行,并转换某些列。
合并第2行和第3行。
val pad_in_pi = pad
.join(
pi
, $"pad.ReferenceKeyCode" === $"pi.PurchaseInvoiceKeyCode"
, "inner"
)
.selectExpr(
"pad.AccountingDocumentKeyCode"
, "pad.RegionId"
, "pi.PurchaseInvoiceLineNumber as DocumentLineNumber"
, "pi.CodingBlockSequentialNumber"
)
val pad_not_in_pi = pad
.join(
pi
, $"pad.ReferenceKeyCode" === $"pi.PurchaseInvoiceKeyCode"
, "anti"
)
.selectExpr(
"pad.AccountingDocumentKeyCode"
, "pad.RegionId"
, "pad.AccountingDocumentLineNumber as DocumentLineNumber"
, "0001 as CodingBlockSequentialNumber"
)
pad_in_pi.union(pad_not_in_pi)
分支2和分支3使用相同的联接表达式,因此可以重用交换。目前的实际计划没有。原因是什么?
== Physical Plan ==
Union
:- *(3) Project [AccountingDocumentKeyCode#491, RegionId#539, PurchaseInvoiceLineNumber#205 AS DocumentLineNumber#954, CodingBlockSequentialNumber#203]
: +- *(3) SortMergeJoin [ReferenceKeyCode#538], [PurchaseInvoiceKeyCode#235], Inner
: :- Sort [ReferenceKeyCode#538 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(ReferenceKeyCode#538, 200), true, [id=#684]
: : +- *(1) Project [AccountingDocumentKeyCode#491, ReferenceKeyCode#538, RegionId#539]
: : +- *(1) Filter ((isnotnull(RegionId#539) AND (RegionId#539 = R)) AND isnotnull(ReferenceKeyCode#538))
: : +- *(1) ColumnarToRow
: : +- FileScan parquet default.purchaseaccountingdocument_delta[AccountingDocumentKeyCode#491,ReferenceKeyCode#538,RegionId#539] Batched: true, DataFilters: [isnotnull(RegionId#539), (RegionId#539 = R), isnotnull(ReferenceKeyCode#538)], Format: Parquet, Location: PreparedDeltaFileIndex[dbfs:..., PartitionFilters: [], PushedFilters: [IsNotNull(RegionId), EqualTo(RegionId,R), IsNotNull(ReferenceKeyCode)], ReadSchema: struct<AccountingDocumentKeyCode:string,ReferenceKeyCode:string,RegionId:string>
: +- Sort [PurchaseInvoiceKeyCode#235 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(PurchaseInvoiceKeyCode#235, 200), true, [id=#692]
: +- *(2) Project [CodingBlockSequentialNumber#203, PurchaseInvoiceLineNumber#205, PurchaseInvoiceKeyCode#235]
: +- *(2) Filter ((isnotnull(RegionId#207) AND (RegionId#207 = R)) AND isnotnull(PurchaseInvoiceKeyCode#235))
: +- *(2) ColumnarToRow
: +- FileScan parquet default.purchaseinvoice_delta[CodingBlockSequentialNumber#203,PurchaseInvoiceLineNumber#205,RegionID#207,PurchaseInvoiceKeyCode#235] Batched: true, DataFilters: [isnotnull(RegionID#207), (RegionID#207 = R), isnotnull(PurchaseInvoiceKeyCode#235)], Format: Parquet, Location: PreparedDeltaFileIndex[dbfs:..., PartitionFilters: [], PushedFilters: [IsNotNull(RegionID), EqualTo(RegionID,R), IsNotNull(PurchaseInvoiceKeyCode)], ReadSchema: struct<CodingBlockSequentialNumber:string,PurchaseInvoiceLineNumber:string,RegionID:string,Purcha...
+- *(6) Project [AccountingDocumentKeyCode#491, RegionId#539, AccountingDocumentLineNumber#492 AS DocumentLineNumber#1208, 1 AS CodingBlockSequentialNumber#1210]
+- SortMergeJoin [ReferenceKeyCode#538], [PurchaseInvoiceKeyCode#235], LeftAnti
:- Sort [ReferenceKeyCode#538 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(ReferenceKeyCode#538, 200), true, [id=#703]
: +- *(4) Project [AccountingDocumentKeyCode#491, AccountingDocumentLineNumber#492, ReferenceKeyCode#538, RegionId#539]
: +- *(4) Filter (isnotnull(RegionId#539) AND (RegionId#539 = R))
: +- *(4) ColumnarToRow
: +- FileScan parquet default.purchaseaccountingdocument_delta[AccountingDocumentKeyCode#491,AccountingDocumentLineNumber#492,ReferenceKeyCode#538,RegionId#539] Batched: true, DataFilters: [isnotnull(RegionId#539), (RegionId#539 = R)], Format: Parquet, Location: PreparedDeltaFileIndex[dbfs:..., PartitionFilters: [], PushedFilters: [IsNotNull(RegionId), EqualTo(RegionId,R)], ReadSchema: struct<AccountingDocumentKeyCode:string,AccountingDocumentLineNumber:string,ReferenceKeyCode:stri...
+- Sort [PurchaseInvoiceKeyCode#235 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(PurchaseInvoiceKeyCode#235, 200), true, [id=#710]
+- *(5) Project [PurchaseInvoiceKeyCode#235]
+- *(5) Filter ((isnotnull(RegionId#207) AND (RegionId#207 = R)) AND isnotnull(PurchaseInvoiceKeyCode#235))
+- *(5) ColumnarToRow
+- FileScan parquet default.purchaseinvoice_delta[RegionID#207,PurchaseInvoiceKeyCode#235] Batched: true, DataFilters: [isnotnull(RegionID#207), (RegionID#207 = R), isnotnull(PurchaseInvoiceKeyCode#235)], Format: Parquet, Location: PreparedDeltaFileIndex[dbfs:..., PartitionFilters: [], PushedFilters: [IsNotNull(RegionID), EqualTo(RegionID,R), IsNotNull(PurchaseInvoiceKeyCode)], ReadSchema: struct<RegionID:string,PurchaseInvoiceKeyCode:string>
1条答案
按热度按时间rpppsulh1#
不直接回答有关exchange重用的问题,但请尝试使用左外联接来除去联合: