pyspark窗口,在组之间聚合结果

gr8qqesn  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(478)

假设我有一个Dataframe,其中包含不同用户通过不同协议发出的请求和记录的度量值:

  1. +---+-----+--------+------------+
  2. | ts| user|protocol|metric_value|
  3. +---+-----+--------+------------+
  4. | 0|user1| tcp| 197|
  5. | 1|user1| udp| 155|
  6. | 2|user1| tcp| 347|
  7. | 3|user1| tcp| 117|
  8. | 4|user1| tcp| 230|
  9. | 5|user1| udp| 225|
  10. | 6|user1| udp| 297|
  11. | 7|user1| tcp| 790|
  12. | 8|user1| udp| 216|
  13. | 9|user1| udp| 200|
  14. +---+-----+--------+------------+

我需要为当前用户的每个协议添加另一个列,其中有最后记录的平均度量值(在当前时间戳之前,并且不早于当前的\uts-4)。所以,算法是这样的:
对于每行x:
查找row.user==x.user和row.ts<x.ts的所有行
从这些行中提取每个协议的最新度量值(如果相应的记录早于x.ts-4,则抛出它)
计算这些度量值的平均值
将计算的平均值追加到新列中的行
预期结果如下:

  1. +---+-----+--------+------------+-------+
  2. | ts| user|protocol|metric_value|avg_val|
  3. +---+-----+--------+------------+-------+
  4. | 0|user1| tcp| 197| null| // no data for user1
  5. | 1|user1| udp| 155| 197| // only tcp value available
  6. | 2|user1| tcp| 347| 176| // (197 + 155) / 2
  7. | 3|user1| tcp| 117| 251| // (347 + 155) / 2
  8. | 4|user1| tcp| 230| 136| // (117 + 155) / 2
  9. | 5|user2| udp| 225| null| // because no data for user2
  10. | 6|user1| udp| 297| 230| // because record with ts==1 is too old now
  11. | 7|user1| tcp| 790| 263.5| // (297 + 230) / 2
  12. | 8|user1| udp| 216| 543.5| // (297 + 790) / 2
  13. | 9|user1| udp| 200| 503| // (216 + 790) / 2
  14. +---+-----+--------+------------+-------+

请注意,表中可能有任意数量的协议和用户。
如何实现?
我试过使用窗口函数、lag(1)和按协议分区,但是聚合函数只计算单个分区的平均值,而不计算不同分区的结果。最接近的结果是sql请求使用协议分区上的行数,但我无法在那里传播row.ts<x.ts条件。

baubqpgj

baubqpgj1#

这是基于scala的解决方案,您可以将逻辑转换为python/pyspark
样本数据:

  1. val df = Seq((0,"user1","tcp",197),(1,"user1","udp",155),(2,"user1","tcp",347),(3,"user1","tcp",117),(4,"user1","tcp",230),(5,"user2","udp",225),(6,"user1","udp",297),(7,"user1","tcp",790),(8,"user1","udp",216),(9,"user1","udp",200))
  2. .toDF("ts","user","protocol","metric_value")

对于每一行,获取所有行 (protocol,metric_value) 为了 current_row.ts -4 在列表中。

  1. val winspec = Window.partitionBy("user").orderBy("ts").rangeBetween(Window.currentRow - 4, Window.currentRow-1)
  2. val df2 = df.withColumn("recent_list", collect_list(struct($"protocol", $"metric_value")).over(winspec))
  3. df2.orderBy("ts").show(false)
  4. /*
  5. +---+-----+--------+------------+------------------------------------------------+
  6. |ts |user |protocol|metric_value|recent_list |
  7. +---+-----+--------+------------+------------------------------------------------+
  8. |0 |user1|tcp |197 |[] |
  9. |1 |user1|udp |155 |[[tcp, 197]] |
  10. |2 |user1|tcp |347 |[[tcp, 197], [udp, 155]] |
  11. |3 |user1|tcp |117 |[[tcp, 197], [udp, 155], [tcp, 347]] |
  12. |4 |user1|tcp |230 |[[tcp, 197], [udp, 155], [tcp, 347], [tcp, 117]]|
  13. |5 |user2|udp |225 |[] |
  14. |6 |user1|udp |297 |[[tcp, 347], [tcp, 117], [tcp, 230]] |
  15. |7 |user1|tcp |790 |[[tcp, 117], [tcp, 230], [udp, 297]] |
  16. |8 |user1|udp |216 |[[tcp, 230], [udp, 297], [tcp, 790]] |
  17. |9 |user1|udp |200 |[[udp, 297], [tcp, 790], [udp, 216]] |
  18. +---+-----+--------+------------+------------------------------------------------+

现在,您在一行中获得了所有必需的信息。您可以编写一个udf来应用获取最新协议类型和平均值的逻辑。

  1. def getAverageValueForUniqRecents(list : Array[StructType]): Double = {
  2. // you logic goes here.
  3. // Loop through your array in REVERSE ORDER
  4. // maintain a set to check if protocol already visited then skip, otherwise SUM
  5. //Finally average
  6. }
展开查看全部

相关问题