假设我有一个Dataframe,其中包含不同用户通过不同协议发出的请求和记录的度量值:
+---+-----+--------+------------+
| ts| user|protocol|metric_value|
+---+-----+--------+------------+
| 0|user1| tcp| 197|
| 1|user1| udp| 155|
| 2|user1| tcp| 347|
| 3|user1| tcp| 117|
| 4|user1| tcp| 230|
| 5|user1| udp| 225|
| 6|user1| udp| 297|
| 7|user1| tcp| 790|
| 8|user1| udp| 216|
| 9|user1| udp| 200|
+---+-----+--------+------------+
我需要为当前用户的每个协议添加另一个列,其中有最后记录的平均度量值(在当前时间戳之前,并且不早于当前的\uts-4)。所以,算法是这样的:
对于每行x:
查找row.user==x.user和row.ts<x.ts的所有行
从这些行中提取每个协议的最新度量值(如果相应的记录早于x.ts-4,则抛出它)
计算这些度量值的平均值
将计算的平均值追加到新列中的行
预期结果如下:
+---+-----+--------+------------+-------+
| ts| user|protocol|metric_value|avg_val|
+---+-----+--------+------------+-------+
| 0|user1| tcp| 197| null| // no data for user1
| 1|user1| udp| 155| 197| // only tcp value available
| 2|user1| tcp| 347| 176| // (197 + 155) / 2
| 3|user1| tcp| 117| 251| // (347 + 155) / 2
| 4|user1| tcp| 230| 136| // (117 + 155) / 2
| 5|user2| udp| 225| null| // because no data for user2
| 6|user1| udp| 297| 230| // because record with ts==1 is too old now
| 7|user1| tcp| 790| 263.5| // (297 + 230) / 2
| 8|user1| udp| 216| 543.5| // (297 + 790) / 2
| 9|user1| udp| 200| 503| // (216 + 790) / 2
+---+-----+--------+------------+-------+
请注意,表中可能有任意数量的协议和用户。
如何实现?
我试过使用窗口函数、lag(1)和按协议分区,但是聚合函数只计算单个分区的平均值,而不计算不同分区的结果。最接近的结果是sql请求使用协议分区上的行数,但我无法在那里传播row.ts<x.ts条件。
1条答案
按热度按时间baubqpgj1#
这是基于scala的解决方案,您可以将逻辑转换为python/pyspark
样本数据:
对于每一行,获取所有行
(protocol,metric_value)
为了current_row.ts -4
在列表中。现在,您在一行中获得了所有必需的信息。您可以编写一个udf来应用获取最新协议类型和平均值的逻辑。