我有一个sparkDataframe,包含如下数据:
+----+---------------------+-------+----------+-------------+
| ID | Timestamp | Value | Interval | Consumption |
+----+---------------------+-------+----------+-------------+
| 1 | 2012-05-02 12:30:00 | 550 | 1 | 5 |
| 1 | 2012-05-02 12:45:00 | 551 | 1 | 1 |
| 1 | 2012-05-02 13:00:00 | 554 | 1 | 3 |
| 1 | 2012-05-02 14:00:00 | 578 | 4 | 24 |
| 1 | 2012-05-02 14:15:00 | 578 | 1 | 0 |
| 1 | 2012-05-02 14:30:00 | 584 | 1 | 6 |
+----+---------------------+-------+----------+-------------+
我想把它变成如下的东西:
+----+---------------------+-------+----------+-------------+------------+
| ID | Timestamp | Value | Interval | Consumption | Estimation |
+----+---------------------+-------+----------+-------------+------------+
| 1 | 2012-05-02 12:30:00 | 550 | 1 | 5 | ? |
| 1 | 2012-05-02 12:45:00 | 551 | 1 | 1 | ? |
| 1 | 2012-05-02 13:00:00 | 554 | 1 | 3 | ? |
| 1 | 2012-05-02 13:15:00 | 560 | 1 | 6 | 4 |
| 1 | 2012-05-02 13:30:00 | 566 | 1 | 6 | 4 |
| 1 | 2012-05-02 13:45:00 | 572 | 1 | 6 | 4 |
| 1 | 2012-05-02 14:00:00 | 578 | 1 | 6 | 4 |
| 1 | 2012-05-02 14:15:00 | 578 | 1 | 0 | ? |
| 1 | 2012-05-02 14:30:00 | 584 | 1 | 6 | ? |
+----+---------------------+-------+----------+-------------+------------+
更具体地说,我想把这个:
+----+---------------------+-------+----------+-------------+
| ID | Timestamp | Value | Interval | Consumption |
+----+---------------------+-------+----------+-------------+
| 1 | 2012-05-02 14:00:00 | 578 | 4 | 24 |
+----+---------------------+-------+----------+-------------+
对此:
+----+---------------------+-------+----------+-------------+------------+
| ID | Timestamp | Value | Interval | Consumption | Estimation |
+----+---------------------+-------+----------+-------------+------------+
| 1 | 2012-05-02 13:15:00 | 560 | 1 | 6 | 4 |
| 1 | 2012-05-02 13:30:00 | 566 | 1 | 6 | 4 |
| 1 | 2012-05-02 13:45:00 | 572 | 1 | 6 | 4 |
| 1 | 2012-05-02 14:00:00 | 578 | 1 | 6 | 4 |
+----+---------------------+-------+----------+-------------+------------+
我想从原始表中取出间隔超过1的行,为丢失的间隔插入值,并将新创建的行重新插入原始行的初始表位置。我知道如何实现这一点(例如,在postgresql中,我只需使用generate_series()函数来创建所需的时间戳并计算新的值),但是在spark/scala中实现这些是很麻烦的。
假设我创建了一个新的Dataframe,其中只包含interval>1的行,那么如何在interval值为n的情况下复制这些行n次呢?我相信这足以让我开始使用计数器函数,它由我可以创建的行引用进行划分。
如果有一种方法可以复制我错过的generate_series()的行为,那就更好了。
暂无答案!
目前还没有任何答案,快来回答吧!