如何在spark中分解Dataframe的行?

ycl3bljg  于 2021-06-26  发布在  Hive
关注(0)|答案(0)|浏览(267)

我有一个sparkDataframe,包含如下数据:

+----+---------------------+-------+----------+-------------+
| ID |      Timestamp      | Value | Interval | Consumption |
+----+---------------------+-------+----------+-------------+
|  1 | 2012-05-02 12:30:00 |   550 |        1 |           5 |
|  1 | 2012-05-02 12:45:00 |   551 |        1 |           1 |
|  1 | 2012-05-02 13:00:00 |   554 |        1 |           3 |
|  1 | 2012-05-02 14:00:00 |   578 |        4 |          24 |
|  1 | 2012-05-02 14:15:00 |   578 |        1 |           0 |
|  1 | 2012-05-02 14:30:00 |   584 |        1 |           6 |
+----+---------------------+-------+----------+-------------+

我想把它变成如下的东西:

+----+---------------------+-------+----------+-------------+------------+
| ID |      Timestamp      | Value | Interval | Consumption | Estimation |
+----+---------------------+-------+----------+-------------+------------+
|  1 | 2012-05-02 12:30:00 |   550 |        1 |           5 | ?          |
|  1 | 2012-05-02 12:45:00 |   551 |        1 |           1 | ?          |
|  1 | 2012-05-02 13:00:00 |   554 |        1 |           3 | ?          |
|  1 | 2012-05-02 13:15:00 |   560 |        1 |           6 | 4          |
|  1 | 2012-05-02 13:30:00 |   566 |        1 |           6 | 4          |
|  1 | 2012-05-02 13:45:00 |   572 |        1 |           6 | 4          |
|  1 | 2012-05-02 14:00:00 |   578 |        1 |           6 | 4          |
|  1 | 2012-05-02 14:15:00 |   578 |        1 |           0 | ?          |
|  1 | 2012-05-02 14:30:00 |   584 |        1 |           6 | ?          |
+----+---------------------+-------+----------+-------------+------------+

更具体地说,我想把这个:

+----+---------------------+-------+----------+-------------+
| ID |      Timestamp      | Value | Interval | Consumption |
+----+---------------------+-------+----------+-------------+
|  1 | 2012-05-02 14:00:00 |   578 |        4 |          24 |
+----+---------------------+-------+----------+-------------+

对此:

+----+---------------------+-------+----------+-------------+------------+
| ID |      Timestamp      | Value | Interval | Consumption | Estimation |
+----+---------------------+-------+----------+-------------+------------+
|  1 | 2012-05-02 13:15:00 |   560 |        1 |           6 | 4          |
|  1 | 2012-05-02 13:30:00 |   566 |        1 |           6 | 4          |
|  1 | 2012-05-02 13:45:00 |   572 |        1 |           6 | 4          |
|  1 | 2012-05-02 14:00:00 |   578 |        1 |           6 | 4          |
+----+---------------------+-------+----------+-------------+------------+

我想从原始表中取出间隔超过1的行,为丢失的间隔插入值,并将新创建的行重新插入原始行的初始表位置。我知道如何实现这一点(例如,在postgresql中,我只需使用generate_series()函数来创建所需的时间戳并计算新的值),但是在spark/scala中实现这些是很麻烦的。
假设我创建了一个新的Dataframe,其中只包含interval>1的行,那么如何在interval值为n的情况下复制这些行n次呢?我相信这足以让我开始使用计数器函数,它由我可以创建的行引用进行划分。
如果有一种方法可以复制我错过的generate_series()的行为,那就更好了。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题