我有以下数据集:
|value|
+-----+
| 1|
| 2|
| 3|
字符串
我想创建一个新列 newValue,它从前一行获取 newValue 的值并对其执行一些操作。为了简单起见,只需递增3。如果没有前一列,则在第一行的情况下,应该获取 value。结果如下所示:
|value|newValue|
+-----+--------+
| 1| 1|
| 2| 4| # newValue previous row (1) + 3
| 3| 7| # newValue previous row (4) + 3
型
我尝试了下面的代码,但是在尝试访问前一行时,新列 newValue 似乎还不存在。如何在withColumn中访问新创建的列?
val data = Seq(1, 2, 3)
val dataset: Dataset[Int] = data.toDS()
val windowSpec = Window.orderBy("value")
val result = dataset.withColumn("newValue", coalesce(lag("newValue", 1).over(windowSpec) + 3, $"value"))
型
这将导致以下错误消息:
org.apache.spark.sql.AnalysisException:[UNRESOLVED_COLUMN.WITH_SUGGESTION]无法解析名为“newValue”的列或函数参数。是否为以下之一?[“value”]
3条答案
按热度按时间zfycwa2u1#
我不确定是否可以使用窗口函数来实现这一点。你可以通过将DataFrame转换为RDD或List来进行计算。我在一个例子中测试了它,它似乎可以工作,但不确定这是否是一种优化的方法。
字符串
rwqw0loc2#
无法访问以前的聚合值。对于这种特定情况,
newValue
可以计算为-所有以前值的总和,加上当前位置乘以三:字符串
输出量:
型
fumotvh33#
我相信你所需要的就是
running sum
和一个常数3
个字符
输出
型