drop\u duplicate是否保证在spark中对Dataframe排序后保留第一行并删除其余行?

dauxcl2d  于 2021-05-26  发布在  Spark
关注(0)|答案(1)|浏览(739)

我有一个Dataframe,从hadoop中的avro文件读取,有三列(a、b、c),其中一列是键列,另外两列中一列是整数类型,另一列是日期类型。
我按整数列和日期列对帧进行排序,然后在生成的帧上调用drop\u duplicates by key列(a)。

  1. frame = frame.orderBy(["b","c"],ascending=False)
  2. frame = frame.drop_duplicate('a')

基于spark scala代码我可以看到 orderBy 在内部调用sort方法,该方法执行全局排序。

  1. /**
  2. * Returns a new Dataset sorted by the given expressions. For example:
  3. * {{{
  4. * ds.sort($"col1", $"col2".desc)
  5. * }}}
  6. *
  7. * @group typedrel
  8. * @since 2.0.0
  9. */
  10. @scala.annotation.varargs
  11. def sort(sortExprs: Column*): Dataset[T] = {
  12. sortInternal(global = true, sortExprs)
  13. }

https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/dataset.scala
此外,还根据下面的spark代码将drop_duplicates(cols)方法转换为aggregate(first(cols))。

  1. object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] {
  2. def apply(plan: LogicalPlan): LogicalPlan = plan transformUpWithNewOutput {
  3. case d @ Deduplicate(keys, child) if !child.isStreaming =>
  4. val keyExprIds = keys.map(_.exprId)
  5. val aggCols = child.output.map { attr =>
  6. if (keyExprIds.contains(attr.exprId)) {
  7. attr
  8. } else {
  9. Alias(new First(attr).toAggregateExpression(), attr.name)()
  10. }
  11. }
  12. // SPARK-22951: Physical aggregate operators distinguishes global aggregation and grouping
  13. // aggregations by checking the number of grouping keys. The key difference here is that a
  14. // global aggregation always returns at least one row even if there are no input rows. Here
  15. // we append a literal when the grouping key list is empty so that the result aggregate
  16. // operator is properly treated as a grouping aggregation.
  17. val nonemptyKeys = if (keys.isEmpty) Literal(1) :: Nil else keys
  18. val newAgg = Aggregate(nonemptyKeys, aggCols, child)
  19. val attrMapping = d.output.zip(newAgg.output)
  20. newAgg -> attrMapping
  21. }
  22. }

https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/dataset.scala
因此,我期望dropduplicate在排序后重新训练第一行,并删除其他行。但在我的星火工作中,我发现这不是真的。
有什么想法吗?为什么?

4jb9z9bj

4jb9z9bj1#

不。
按b&c排序,然后按a删除,如果且仅当只有一个分区要处理时,就可以按您的意愿进行排序。而大数据通常不是这样。
所以,你可以在其他地方搜索: dropDuplicates 保留 first occurrence 一个排序操作-只有当有一个分区,否则它是幸运的。
i、 e.当有更多的分区在运行时是不确定的。
与avro或Pypark无关。此外,b、c的顺序也可能是不确定的。

相关问题