scala 按序列Spark分组记录

cbjzeqam  于 12个月前  发布在  Scala
关注(0)|答案(2)|浏览(80)

我有下面的数据集

Date    | status       |
20230101| C            |
20230101| C            |
20230101| R            |
20230101| C            |
20230101| C            |
20230101| R            |

我需要以这样一种方式执行分组,即每个状态记录都与以前的状态记录进行比较,如果值不同,则它们是同一组,否则它们属于不同的组。
输出应

Date    | status       | id
20230101| C            |1
20230101| C            |2
20230101| R            |2
20230101| C            |2
20230101| C            |3
20230101| R            |3
Dataset<Row> ds =  dataset
            .withColumn("newVal", when(col("status").equalTo("C"), 1).otherwise(0))
            .withColumn("id", expr("row_number() over (order by Date)"))
            .persist(StorageLevel.MEMORY_ONLY());

我觉得这不管用,有人能帮帮我吗?
分组总是从R开始

bwitn5fc

bwitn5fc1#

这里的答案实际上是一个反答案。如果数据是大规模的,这不是Spark应该做的事情。为什么?为什么?

  • EXPLAIN揭示了第一个提供的解决方案-Exchange SinglePartition,ENSURE_REQUIREMENTS,[plan_id=xxxx]这意味着OOM可能,即使分区大小更大。在Spark总是一件坏事。
  • 在第一次提供的解决方案中重复计算。
  • 也没有自然的分组键可以单独在分区内工作。对于一个分区来说,“Date”可能也有太多的数据。
  • 试图用lag来解决这个问题,lead函数不能可靠地工作,因为lead和lag只能在一个分区内工作。作为一个可以解决的数据管理员,但它变得令人费解,第一个提供的答案是优雅的。

所以,如果数据很小,这是对Spark的使用,你可以在Spark中做到这一点。否则,实际上总是在传统的DB中执行此操作。

ioekq8ef

ioekq8ef2#

通过比较当前和以前的状态,以及由范围内的创建组start分配的组id,可以找到组start:

val df = Seq(
  ("20230101", "C"),
  ("20230101", "C"),
  ("20230101", "R"),
  ("20230101", "C"),
  ("20230101", "C"),
  ("20230101", "R"),
).toDF(
  "Date", "status"
)

val naturalOrderWindow = Window.orderBy(lit(1))
// compare current and previous status
val dateRowNumWindow = Window.orderBy($"Date", $"row_number")
val isGroupStartColumn = coalesce(lag($"status", 1).over(dateRowNumWindow), $"status") === $"status"

val withGroupStartDF = df
  .withColumn("row_number", row_number().over(naturalOrderWindow))
  .withColumn("isGroupStart", isGroupStartColumn)

withGroupStartDF.show(false)

val rowNumberWindow =  Window.orderBy($"row_number").rangeBetween(Window.unboundedPreceding, Window.currentRow)

val result = withGroupStartDF
  .withColumn("id", max(
    when($"isGroupStart", $"row_number").otherwise(0)
  ).over(rowNumberWindow))
  .drop("row_number","isGroupStart")

输出量:

+--------+------+----------+------------+
|Date    |status|row_number|isGroupStart|
+--------+------+----------+------------+
|20230101|C     |1         |true        |
|20230101|C     |2         |true        |
|20230101|R     |3         |false       |
|20230101|C     |4         |false       |
|20230101|C     |5         |true        |
|20230101|R     |6         |false       |
+--------+------+----------+------------+

+--------+------+---+
|Date    |status|id |
+--------+------+---+
|20230101|C     |1  |
|20230101|C     |2  |
|20230101|R     |2  |
|20230101|C     |2  |
|20230101|C     |5  |
|20230101|R     |5  |
+--------+------+---+

相关问题