spark中的累积和

mfuanj7w  于 2021-06-26  发布在  Hive
关注(0)|答案(1)|浏览(353)

我想在spark里做累加和。这是寄存器表(输入):

+---------------+-------------------+----+----+----+
|     product_id|          date_time| ack|val1|val2|
+---------------+-------------------+----+----+----+
|4008607333T.upf|2017-12-13:02:27:01|3-46|  53|  52|
|4008607333T.upf|2017-12-13:02:27:03|3-47|  53|  52|
|4008607333T.upf|2017-12-13:02:27:08|3-46|  53|  52|
|4008607333T.upf|2017-12-13:02:28:01|3-47|  53|  52|
|4008607333T.upf|2017-12-13:02:28:07|3-46|  15|   1|
+---------------+-------------------+----+----+----+

配置单元查询:

select *, SUM(val1) over ( Partition by product_id, ack order by date_time rows between unbounded preceding and current row ) val1_sum, SUM(val2) over ( Partition by product_id, ack order by date_time rows between unbounded preceding and current row ) val2_sum from test

输出:

+---------------+-------------------+----+----+----+-------+--------+
|     product_id|          date_time| ack|val1|val2|val_sum|val2_sum|
+---------------+-------------------+----+----+----+-------+--------+
|4008607333T.upf|2017-12-13:02:27:01|3-46|  53|  52|     53|      52|
|4008607333T.upf|2017-12-13:02:27:08|3-46|  53|  52|    106|     104|
|4008607333T.upf|2017-12-13:02:28:07|3-46|  15|   1|    121|     105|
|4008607333T.upf|2017-12-13:02:27:03|3-47|  53|  52|     53|      52|
|4008607333T.upf|2017-12-13:02:28:01|3-47|  53|  52|    106|     104|
+---------------+-------------------+----+----+----+-------+--------+

使用spark逻辑,我得到了相同的上述输出:

import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy('product_id, 'ack).orderBy('date_time)
import org.apache.spark.sql.functions._

val newDf = inputDF.withColumn("val_sum", sum('val1) over w).withColumn("val2_sum", sum('val2) over w)
newDf.show

然而,当我在spark cluster上尝试这个逻辑时 val_sum 值将是累积和的一半,并且某个时间它是不同的。我不知道为什么会发生在星火团上。是因为隔墙吗?
如何计算Spark簇上一列的累积和?

k5hmc34c

k5hmc34c1#

要使用dataframeapi获得累积和,应该使用 rowsBetween 窗口方法。在spark 2.1及更新版本中,创建如下窗口:

val w = Window.partitionBy($"product_id", $"ack")
  .orderBy($"date_time")
  .rowsBetween(Window.unboundedPreceding, Window.currentRow)

这将告诉spark使用从分区开始到当前行的值。使用旧版本的spark,使用 rowsBetween(Long.MinValue, 0) 同样的效果。
要使用窗口,请使用与以前相同的方法:

val newDf = inputDF.withColumn("val_sum", sum($"val1").over(w))
  .withColumn("val2_sum", sum($"val2").over(w))

相关问题