我们是否可以使用一个偏移量值,它取决于sparksql中lead/lag函数的列值?
示例:下面是一些很好的方法。
val sampleData = Seq( ("bob","Developer",125000),
("mark","Developer",108000),
("carl","Tester",70000),
("peter","Developer",185000),
("jon","Tester",65000),
("roman","Tester",82000),
("simon","Developer",98000),
("eric","Developer",144000),
("carlos","Tester",75000),
("henry","Developer",110000)).toDF("Name","Role","Salary")
val window = Window.orderBy("Role")
//Derive lag column for salary
val laggingCol = lag(col("Salary"), 1).over(window)
//Use derived column LastSalary to find difference between current and previous row
val salaryDifference = col("Salary") - col("LastSalary")
//Calculate trend based on the difference
//IF ELSE / CASE can be written using when.otherwise in spark
val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
.when(col("SalaryDiff").>(0), "UP")
.otherwise("DOWN")
sampleData.withColumn("LastSalary", laggingCol)
.withColumn("SalaryDiff",salaryDifference)
.withColumn("Trend", trend).show()
现在,我的用例是这样的,我们必须传递的偏移量取决于integer类型的特定列。这是我想做的:
val sampleData = Seq( ("bob","Developer",125000,2),
("mark","Developer",108000,3),
("carl","Tester",70000,3),
("peter","Developer",185000,2),
("jon","Tester",65000,1),
("roman","Tester",82000,1),
("simon","Developer",98000,2),
("eric","Developer",144000,3),
("carlos","Tester",75000,2),
("henry","Developer",110000,2)).toDF("Name","Role","Salary","ColumnForOffset")
val window = Window.orderBy("Role")
//Derive lag column for salary
val laggingCol = lag(col("Salary"), col("ColumnForOffset")).over(window)
//Use derived column LastSalary to find difference between current and previous row
val salaryDifference = col("Salary") - col("LastSalary")
//Calculate trend based on the difference
//IF ELSE / CASE can be written using when.otherwise in spark
val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
.when(col("SalaryDiff").>(0), "UP")
.otherwise("DOWN")
sampleData.withColumn("LastSalary", laggingCol)
.withColumn("SalaryDiff",salaryDifference)
.withColumn("Trend", trend).show()
由于偏移量只取整数值,因此这将引发预期的异常。让我们来讨论一下,我们是否可以实现一个逻辑。
1条答案
按热度按时间ngynwnxp1#
您可以添加行号列,并根据行号和偏移量进行自联接,例如: