pyspark streaming-仅从[window\u start,window\u end]显示启动窗口

eeq64g8w  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(306)

我正在以流的形式读取csv文件列表,并使用1小时的间隔来存储时间戳。

import pyspark.sql.functions as F

stream = streaming.selectExpr("car", "cost", "timestamp")\
        .withWatermark("timestamp", "30 seconds")\
        .groupBy(F.col("car"), F.window("timestamp", "1 hour").alias("tmst_window"))\
        .agg(F.sum("cost").alias("agg_cost")) 

    +--------------+------------------------------------------+------------------+
    |car           |tmst_window                               |agg_cost          |
    +--------------+------------------------------------------+------------------+
    |Toyota        |[2010-12-01 14:00:00, 2010-12-01 15:00:00]|10                |
    |Ford          |[2010-12-01 14:00:00, 2010-12-01 15:00:00]|20                |
    |Audi          |[2010-12-01 13:00:00, 2010-12-01 14:00:00]|30                |

如何只使用开始窗口来显示它,而不使用结束窗口时间戳?我想实时得到结果,所以我不想将其存储到临时Dataframe,然后拆分/分解数据。如何重写上面的流式查询以生成下面的查询?

+--------------+--------------------------------------+------------------+
    |car           |tmst_window                           |agg_cost          |
    +--------------+--------------------------------------+------------------+
    |Toyota        |2010-12-01 14:00:00                   |10                |
    |Ford          |2010-12-01 14:00:00                   |20                |
    |Audi          |2010-12-01 13:00:00                   |30                |
drnojrws

drnojrws1#

如果你打印出 stream 你会注意到 tmst_window 是包含元素的结构类型 start 以及 end :

root
 |-- car: string (nullable = true)
 |-- tmst_window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- agg_cost: double (nullable = true)

因此,您可以使用以下命令选择start元素 F.col('tmst_window')['start'] 或者更简单一点, F.col('tmst_window.start')

相关问题