scala 获取错误:联合只能在连接2个数据集时对具有相同列数的表执行

pxy2qtax  于 2023-11-18  发布在  Scala
关注(0)|答案(1)|浏览(262)

我有两个数据集[ReconEntity],其中ReconEntity是:

  1. case class ReconEntity(rowId: String,
  2. groupId: String,
  3. amounts: List[Amount],
  4. processingDate: Long,
  5. attributes: Map[String, String],
  6. entityType: String,
  7. isDuplicate: String)

字符串
第一个数据集看起来像:

  1. +-----+-------+------------------+--------------+----------+-----------+
  2. |rowId|groupId| amount|processingDate|attributes|isDuplicate|
  3. +-----+-------+------------------+--------------+----------+-----------+
  4. | C1| G1|USD,10.00000000...| 1551021334| rowId,C1| false|
  5. | C1| G1|USD,10.00000000...| 1551011017| rowId,C1| true|
  6. | C1| G1|USD,10.00000000...| 1551011017| rowId,C1| true|
  7. | C1| G1|USD,10.00000000...| 1551011017| rowId,C1| true|
  8. | C2| G2|USD,2.000000000...| 1551011017| rowId,C2| false|
  9. | C3| G2|USD,6.000000000...| 1551011459| rowId,C3| false|
  10. | C3| G2|USD,6.000000000...| 1551011017| rowId,C3| true|
  11. +-----+-------+------------------+--------------+----------+-----------+


第二个数据集看起来像:

  1. +-----+-------+------------------+--------------+----------+-----------+
  2. |rowId|groupId| amount|processingDate|attributes|isDuplicate|
  3. +-----+-------+------------------+--------------+----------+-----------+
  4. | C2| G2|USD,2.000000000...| 1551011017| rowId,C2| false|
  5. | C3| G2|USD,6.000000000...| 1551011459| rowId,C3| false|
  6. +-----+-------+------------------+--------------+----------+-----------+


我希望结果看起来像这样:

  1. +-----+-------+------------------+--------------+----------+-----------+
  2. |rowId|groupId| amount|processingDate|attributes|isDuplicate|
  3. +-----+-------+------------------+--------------+----------+-----------+
  4. | C1| G1|USD,10.00000000...| 1551021334| rowId,C1| true|
  5. | C1| G1|USD,10.00000000...| 1551011017| rowId,C1| true|
  6. | C1| G1|USD,10.00000000...| 1551011017| rowId,C1| true|
  7. | C1| G1|USD,10.00000000...| 1551011017| rowId,C1| true|
  8. | C2| G2|USD,2.000000000...| 1551011017| rowId,C2| false|
  9. | C3| G2|USD,6.000000000...| 1551011459| rowId,C3| false|
  10. | C3| G2|USD,6.000000000...| 1551011017| rowId,C3| true|
  11. +-----+-------+------------------+--------------+----------+-----------+


我使用左连接连接两个数据集,如果第二个数据集中不存在rowId,我将isDuplicate标志的值标记为false,否则将结果数据集中的原始值标记为false。逻辑是:

  1. inputEntries.as("inputDataset").join(otherEntries.as("otherDataset")
  2. .select(joinKey),
  3. Seq(joinKey), "left")
  4. .withColumn("isDuplicateFinal", when(col("otherDataset.rowId").isNull, TRUE).otherwise(col("inputDataset.isDuplicate")))
  5. .drop("otherDataset.isDuplicate")
  6. .select(
  7. col("inputDataset.rowId"),
  8. col("inputDataset.groupId"),
  9. col("inputDataset.amounts"),
  10. col("inputDataset.processingDate"),
  11. col("inputDataset.attributes"),
  12. col("inputDataset.entityType"),
  13. col("isDuplicateFinal").alias("isDuplicate")
  14. ).as[ReconEntity]


这里的joinKey是rowId。这个逻辑在本地工作得很好,但是当我试图运行spark作业时,它失败了,原因是:

  1. ERROR ApplicationMaster: User class threw exception: org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the first table has 7 columns and the second table has 8 columns;
  2. 'Project [rowId#247, groupId#248, amounts#249, processingDate#250L, attributes#251, entityType#252, isDuplicate#526, isDuplicateFinal#7810]
  3. +- 'Project [rowId#4188, rowId#247, groupId#248, amounts#249, processingDate#250L, attributes#251, entityType#252, isDuplicate#526, CASE WHEN isnull(rowId#4188) THEN true ELSE isDuplicate#526 END AS isDuplicateFinal#7810]
  4. +- 'Project [rowId#4188, rowId#247, groupId#248, amounts#249, processingDate#250L, attributes#251, entityType#252, isDuplicate#526]
  5. +- 'Join LeftOuter, (rowId#247 = rowId#4188)
  6. :- SubqueryAlias inputDataset
  7. : +- Project [rowId#247, groupId#248, amounts#249, processingDate#250L, attributes#251, entityType#252, isDuplicate#526]
  8. : +- Project [rowId#247, groupId#248, amounts#249, processingDate#250L, attributes#251, entityType#252, CASE WHEN (row_number#517 = 1) THEN false ELSE true END AS isDuplicate#526, row_number#517]
  9. : +- Project [rowId#247, groupId#248, amounts#249, processingDate#250L, attributes#251, entityType#252, isDuplicate#253, row_number#517]
  10. : +- Project [rowId#247, groupId#248, amounts#249, processingDate#250L, attributes#251, entityType#252, isDuplicate#253, row_number#517, row_number#517]
  11. : +- Window [row_number() windowspecdefinition(rowId#247, processingDate#250L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number#517], [rowId#247], [processingDate#250L DESC NULLS LAST]
  12. : +- Project [rowId#247, groupId#248, amounts#249, processingDate#250L, attributes#251, entityType#252, isDuplicate#253]


无法理解UNION是否在执行as[ReconEntity]或任何其他操作时发生,以及正在生成新列,因为我在加入2个数据集后下降。

fjaof16o

fjaof16o1#

即使我不确定UNION在这段代码中发生的位置。但是,.select(joinKey)是必需的吗?你能试着删除它运行一次吗?

相关问题