Pyspark:按一列的值排序,但根据另一列生成组ID

g9icjywg  于 2023-11-16  发布在  Spark
关注(0)|答案(1)|浏览(217)

我有一个这样的相框

  1. import pyspark.sql.functions as F
  2. from pyspark.sql.window import Window
  3. have = spark.createDataFrame(
  4. [('a', 'r1', '1'),
  5. ('b', 'r1', '2'),
  6. ('c', 'r1', '3'),
  7. ('d', 's3', '4'),
  8. ('e', 's3', '5'),
  9. ('f', 's4', '6'),
  10. ('g', 'r1', '7')],
  11. ['id', 'group_col', 'order_col'])

字符串
我想基于group_col创建一个组ID列,但仅当组发生变化时。因此,当r1组再次出现时,它将获得与第一个出现的r1不同的组ID。

  1. want = spark.createDataFrame(
  2. [('a', 'r1', '1', '1'),
  3. ('b', 'r1', '2', '1'),
  4. ('c', 'r1', '3', '1'),
  5. ('d', 's3', '4', '2'),
  6. ('e', 's3', '5', '2'),
  7. ('f', 's4', '6', '3'),
  8. ('g', 'r1', '7', '4')],
  9. ['id', 'group_col', 'order_col', 'rleid'])
  10. want.show()
  11. +---+---------+---------+-----+
  12. | id|group_col|order_col|rleid|
  13. +---+---------+---------+-----+
  14. | a| r1| 1| 1|
  15. | b| r1| 2| 1|
  16. | c| r1| 3| 1|
  17. | d| s3| 4| 2|
  18. | e| s3| 5| 2|
  19. | f| s4| 6| 3|
  20. | g| r1| 7| 4|
  21. +---+---------+---------+-----+


组ID不必是连续的,我只需要一种方法使每个组都是唯一的。
基本上,我希望在data.table R包中有类似rleid function的东西。等效的R代码是:

  1. library(data.table)
  2. df <- data.table(
  3. id = letters[1:7],
  4. group_col = c("r1", "r1", "r1", "s3", "s3", "s4", "r1"),
  5. order_col = c(1:7)
  6. )
  7. setorder(df, order_col)
  8. df[, `:=` (rleid = rleid(group_col))]
  9. df
  10. id group_col order_col rleid
  11. 1: a r1 1 1
  12. 2: b r1 2 1
  13. 3: c r1 3 1
  14. 4: d s3 4 2
  15. 5: e s3 5 2
  16. 6: f s4 6 3
  17. 7: g r1 7 4


我已经尝试过rank()dense_rank()超过group_col

  1. df = have.withColumn("rleid", F.dense_rank().over(Window.orderBy('group_col')))
  2. df.show()
  3. +---+---------+---------+-----+
  4. | id|group_col|order_col|rleid|
  5. +---+---------+---------+-----+
  6. | a| r1| 1| 1|
  7. | b| r1| 2| 1|
  8. | c| r1| 3| 1|
  9. | g| r1| 7| 1|
  10. | d| s3| 4| 2|
  11. | e| s3| 5| 2|
  12. | f| s4| 6| 3|
  13. +---+---------+---------+-----+


这并没有给予我想要的结果,因为id=g应该有rleid=4。
我也尝试过基于this answerarray_sort(),但不幸的是这也不起作用。

  1. df = (have
  2. .withColumn("rank", F.array_sort(F.collect_set('group_col').over(Window.orderBy('order_col').rowsBetween(Window.unboundedPreceding, Window.currentRow))))
  3. .withColumn('rleid', F.expr("array_position(rank, group_col)")))
  4. df.show()
  5. +---+---------+---------+------------+-----+
  6. | id|group_col|order_col| rank|rleid|
  7. +---+---------+---------+------------+-----+
  8. | a| r1| 1| [r1]| 1|
  9. | b| r1| 2| [r1]| 1|
  10. | c| r1| 3| [r1]| 1|
  11. | d| s3| 4| [r1, s3]| 2|
  12. | e| s3| 5| [r1, s3]| 2|
  13. | f| s4| 6|[r1, s3, s4]| 3|
  14. | g| r1| 7|[r1, s3, s4]| 1|
  15. +---+---------+---------+------------+-----+

jtjikinw

jtjikinw1#

比较group_col中的当前行和前一行,以标记group更改的边界点,然后计算边界条件上的累积和,以分配唯一的id

  1. W = Window.orderBy('order_col')
  2. cond = F.lag('group_col').over(W) != F.col('group_col')
  3. want = have.withColumn('rleid', F.coalesce(F.sum(cond.cast('int')).over(W), F.lit(0)))

个字符

相关问题