python时间序列数据

nr9pn0ug  于 2021-05-16  发布在  Spark
关注(0)|答案(1)|浏览(470)

我们有2个数据集,我想生成一个结果表数据集。如何使用pyspark或spark+scalaxx生成结果数据集
数据是为日志文件,我想得到的数据,这是有2列,其中它将显示开始日期和结束日期与周期\状态列。
失败

+-------------------+
| fail_date         |
+-------------------+
| 2018-12-28        |
| 2018-12-29        |
| 2019-01-04        |
| 2019-01-05        |
+-------------------+

成功

+-------------------+
| success_date      | 
+-------------------+
| 2018-12-30        |
| 2018-12-31        |
| 2019-01-01        |
| 2019-01-02        |
| 2019-01-03        |
| 2019-01-06        |
+-------------------+

结果表:

+--------------+--------------+--------------+
| period_state | start_date   | end_date     |
+--------------+--------------+--------------+
| succeeded    | 2019-01-01   | 2019-01-03   |
| failed       | 2019-01-04   | 2019-01-05   |
| succeeded    | 2019-01-06   | 2019-01-06   |
+--------------+--------------+--------------+
t98cgbkg

t98cgbkg1#

不需要自定义项。只需使用如下窗口函数:

import pyspark.sql.functions as F
from pyspark.sql.window import Window

df = fail.withColumn('state', F.lit(0)).union(
    success.withColumn('state', F.lit(1))
).toDF('date', 'state')

df.show()
+----------+-----+
|      date|state|
+----------+-----+
|2018-12-28|    0|
|2018-12-29|    0|
|2019-01-04|    0|
|2019-01-05|    0|
|2018-12-30|    1|
|2018-12-31|    1|
|2019-01-01|    1|
|2019-01-02|    1|
|2019-01-03|    1|
|2019-01-06|    1|
+----------+-----+
df2 = df.withColumn(
    'begin',
    F.coalesce(
        F.lag('state').over(Window.orderBy('date')) != F.col('state'), 
        F.lit(True)
    )
).withColumn(
    'end',
    F.coalesce(
        F.lead('state').over(Window.orderBy('date')) != F.col('state'), 
        F.lit(True)
    )
).withColumn(
    'last_change_date',
    F.last(
        F.when(F.col('begin'), F.col('date')), ignorenulls=True
    ).over(Window.orderBy('date'))
).filter(
    'end = true'
).select(
    F.when(
        F.col('state') == 1,
        F.lit('succeeded')
    ).otherwise(
        F.lit('failed')
    ).alias('period_state'),
    F.col('last_change_date').alias('start_date'), 
    F.col('date').alias('end_date')
)
df2.show()
+------------+----------+----------+
|period_state|start_date|  end_date|
+------------+----------+----------+
|      failed|2018-12-28|2018-12-29|
|   succeeded|2018-12-30|2019-01-03|
|      failed|2019-01-04|2019-01-05|
|   succeeded|2019-01-06|2019-01-06|
+------------+----------+----------+

如果您对中间结果感兴趣:

+----------+-----+-----+-----+----------------+
|      date|state|begin|  end|last_change_date|
+----------+-----+-----+-----+----------------+
|2018-12-28|    0| true|false|      2018-12-28|
|2018-12-29|    0|false| true|      2018-12-28|
|2018-12-30|    1| true|false|      2018-12-30|
|2018-12-31|    1|false|false|      2018-12-30|
|2019-01-01|    1|false|false|      2018-12-30|
|2019-01-02|    1|false|false|      2018-12-30|
|2019-01-03|    1|false| true|      2018-12-30|
|2019-01-04|    0| true|false|      2019-01-04|
|2019-01-05|    0|false| true|      2019-01-04|
|2019-01-06|    1| true| true|      2019-01-06|
+----------+-----+-----+-----+----------------+

相关问题