星火行号无次序分割保持自然次序

0lvr5msh  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(340)

我想实现无需任何排序的分区,这样数据就可以保持在Dataframe中的自然排序。请分享任何建议,提前谢谢。
假设spark数据框中有以下数据

raw data
----------------------------
 name | item id |   action
----------------------------
 John |    120  |   sell 
----------------------------
 John |    320  |   buy
----------------------------
 Jane |    120  |   sell 
----------------------------
 Jane |    450  |   buy
----------------------------
 Sam  |    360  |   sell 
----------------------------
 Sam  |    300  |   hold
----------------------------
 Sam  |    450  |   buy
----------------------------
 Tim  |    470  |   buy
----------------------------

此表架构中有两个规则

1. Every one has at least one action `buy`
2. Every one's last action must be `buy` as well

现在我想添加一个序列列,只是为了显示每个人的操作顺序

expectation
--------------------------------------
 name | item id |   action  |  seq   
--------------------------------------
 John |    120  |   sell    |  1
--------------------------------------
 John |    320  |   buy     |  2
--------------------------------------
 Jane |    120  |   sell    |  1
--------------------------------------
 Jane |    450  |   buy     |  2
--------------------------------------
 Sam  |    360  |   sell    |  1
--------------------------------------
 Sam  |    300  |   hold    |  2
--------------------------------------
 Sam  |    450  |   buy     |  3
--------------------------------------
 Tim  |    470  |   buy     |  1
--------------------------------------

这是我的密码

import org.apache.spark.sql.functions.{row_number}
import org.apache.spark.sql.expressions.Window
....

val df = spark.read.json(....)
val spec = Window.partitionBy($"name").orderBy(lit(1))         <-- don't know what to used for order by

val dfWithSeq = df.withColumn("seq", row_number.over(spec))   <--- please show me the magic

有趣的是 dfWithSeq ,显示每个人下的操作都有随机序列,因此使用seq时,操作不再遵循原始数据表中给定的顺序。但是我找不到解决办法。

actual result
--------------------------------------
 name | item id |   action  |  seq   
--------------------------------------
 John |    120  |   sell    |  1
--------------------------------------
 John |    320  |   buy     |  2
--------------------------------------
 Jane |    120  |   sell    |  2          <-- this is wrong
--------------------------------------
 Jane |    450  |   buy     |  1          <-- this is wrong
--------------------------------------
 Sam  |    360  |   sell    |  1
--------------------------------------
 Sam  |    300  |   hold    |  2
--------------------------------------
 Sam  |    450  |   buy     |  3
--------------------------------------
 Tim  |    470  |   buy     |  1
--------------------------------------
deyfvvtc

deyfvvtc1#

需要使用:
转换为rdd并返回df后的zipwithindex。这是一个狭窄的转换,它将保留您的(初始)数据顺序。
然后,你要适当地考虑到名称中的序列号或其他任何东西来进行分区。
剩下的就交给你了。

inb24sb2

inb24sb22#

使用 monotonically_increasing_id .

import org.apache.spark.sql.functions.{row_number, monotonically_increasing_id}
import org.apache.spark.sql.expressions.Window
....

val df = spark.read.json(....)
val spec = Window.partitionBy($"name").orderBy($"order")

val dfWithSeq = df.withColumn("order", monotonically_increasing_id)
  .withColumn("seq", row_number.over(spec))

相关问题