基于另一列的带有lag的窗口函数

ny6fqffe  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(289)

我有以下sparkDataframe:
IDMONTH列1列2 A1100A22001A38002A415003A512000A616001A725002A828003A930004
我想创建一个新列,基于列2给出的动态延迟,我们称之为“dif\u column1”。所需输出为:
IDMONTH列1列2如果列1A110000A22001100A38002700A4150031400A5120000A616001400A750021300A8280031600A9300041800
我尝试过使用lag函数,但显然我只能使用一个整数和lag函数,所以它不起作用:

w = Window.partitionBy("id")
sdf = sdf.withColumn("dif_column1", F.col("column_1") - F.lag("column_1",F.col("column_2")).over(w))
6psbrbz9

6psbrbz91#

您可以添加行号列,并根据行号和列2中定义的延迟进行自联接:

from pyspark.sql import functions as F, Window

w = Window.partitionBy("id").orderBy("month")

df1 = df.withColumn('rn', F.row_number().over(w)) 

df2 = df1.alias('t1').join(
    df1.alias('t2'),
    F.expr('(t1.id = t2.id) and (t1.rn = t2.rn + t1.column_2)'),
    'left'
).selectExpr(
    't1.*',
    't1.column_1 - t2.column_1 as dif_column1'
).drop('rn')

df2.show()
+---+-----+--------+--------+-----------+
| id|month|column_1|column_2|dif_column1|
+---+-----+--------+--------+-----------+
|  A|    1|     100|       0|          0|
|  A|    2|     200|       1|        100|
|  A|    3|     800|       2|        700|
|  A|    4|    1500|       3|       1400|
|  A|    5|    1200|       0|          0|
|  A|    6|    1600|       1|        400|
|  A|    7|    2500|       2|       1300|
|  A|    8|    2800|       3|       1600|
|  A|    9|    3000|       4|       1800|
+---+-----+--------+--------+-----------+

相关问题