以10分钟的间隔聚合python sparkDataframe的行

t0ybt7op  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(334)

我可以在pyspark python中实现这一点吗?不管优化与否。拜托,我被封锁了
初始csv
日期时间idSensor1值传感器1idSensor2值传感器22021011310:01:01.000000171517382021011310:05:05.1111111 71617282021011310:08.22222 171417282021011310:10:10.333333171617282021011310:15:15.4444 1718172102011310:18.5555171917210
我们把这些值加起来​​将valuesensor1和valuesensor2列的最后10分钟
DateTimeIDSensor1传感器值传感器1传感器值传感器2间期值传感器1间期值传感器2021011310:01:01.00000017151738[]2021011310:05:05.1111111 7161728[5][8]2021011310:08:08.22222 17141728[5,6][8,8]2021011310:10.33333317161728[5,6,4][8,8]2021011310:15.4444 171817210[6,6][8,8,8]2021011310:18.5555 171917210[4,6,8][8,10]

我们把这些值加起来​​过去10分钟的值传感器1和值传感器2列之和

DateTimeIDSensor1值传感器1值传感器2接口2021011310:01:01.00000017151738[]2021011310:05:05.1111111 7161728[13]2021011310:08:08.22222 17141728[13,14]2021011310:10:10.333333 17161728[13,14,12]2021011310:15:15.4444 171817210[14,12,14]2021011310:18.5555 171917210[12,14,18]

piztneat

piztneat1#

你可以用 collect_list 在由unix时间戳排序的窗口上,范围从当前行之前的600秒(10分钟)到当前行之前的1秒:

df2 = df.withColumn(
    'intervalsValuesSensor1', 
    F.collect_list('valueSensor1').over(
        Window.partitionBy('idSensor1')
              .orderBy(F.unix_timestamp(F.concat('Date', 'Time'), 'yyyyMMddHH:mm:ss.SSSSSS'))
              .rangeBetween(-600, -1)
    )
)

df2.show()
+--------+---------------+---------+------------+---------+------------+----------------------+
|    Date|           Time|idSensor1|valueSensor1|idSensor2|valueSensor2|intervalsValuesSensor1|
+--------+---------------+---------+------------+---------+------------+----------------------+
|20210113|10:01:01.000000|      171|           5|      173|           8|                    []|
|20210113|10:05:05.111111|      171|           6|      172|           8|                   [5]|
|20210113|10:08:08.222222|      171|           4|      172|           8|                [5, 6]|
|20210113|10:10:10.333333|      171|           6|      172|           8|             [5, 6, 4]|
|20210113|10:15:15.444444|      171|           8|      172|          10|                [4, 6]|
|20210113|10:18:18.555555|      171|           9|      172|          10|                [6, 8]|
+--------+---------------+---------+------------+---------+------------+----------------------+

同样的,你也可以这样做 Sensor2 . 总之,你可以 F.collect_list(F.col('valueSensor1') + F.col('valueSensor2'))

相关问题