我正在以流的形式读取csv文件列表,并使用1小时的间隔来存储时间戳。
import pyspark.sql.functions as F
stream = streaming.selectExpr("car", "cost", "timestamp")\
.withWatermark("timestamp", "30 seconds")\
.groupBy(F.col("car"), F.window("timestamp", "1 hour").alias("tmst_window"))\
.agg(F.sum("cost").alias("agg_cost"))
+--------------+------------------------------------------+------------------+
|car |tmst_window |agg_cost |
+--------------+------------------------------------------+------------------+
|Toyota |[2010-12-01 14:00:00, 2010-12-01 15:00:00]|10 |
|Ford |[2010-12-01 14:00:00, 2010-12-01 15:00:00]|20 |
|Audi |[2010-12-01 13:00:00, 2010-12-01 14:00:00]|30 |
如何只使用开始窗口来显示它,而不使用结束窗口时间戳?我想实时得到结果,所以我不想将其存储到临时Dataframe,然后拆分/分解数据。如何重写上面的流式查询以生成下面的查询?
+--------------+--------------------------------------+------------------+
|car |tmst_window |agg_cost |
+--------------+--------------------------------------+------------------+
|Toyota |2010-12-01 14:00:00 |10 |
|Ford |2010-12-01 14:00:00 |20 |
|Audi |2010-12-01 13:00:00 |30 |
1条答案
按热度按时间drnojrws1#
如果你打印出
stream
你会注意到tmst_window
是包含元素的结构类型start
以及end
:因此,您可以使用以下命令选择start元素
F.col('tmst_window')['start']
或者更简单一点,F.col('tmst_window.start')