我有一个数据框架,其中包含与各种实体相关的日常事件。我想填补那些时间序列中的空白。
这是我拥有的聚合数据(左侧),右侧是我想要拥有的数据:
+---------+----------+-------+ +---------+----------+-------+
|entity_id| date|counter| |entity_id| date|counter|
+---------+----------+-------+ +---------+----------+-------+
| 3|2020-01-01| 7| | 3|2020-01-01| 7|
| 1|2020-01-01| 10| | 1|2020-01-01| 10|
| 2|2020-01-01| 3| | 2|2020-01-01| 3|
| 2|2020-01-02| 9| | 2|2020-01-02| 9|
| 1|2020-01-03| 15| | 1|2020-01-02| 0|
| 2|2020-01-04| 3| | 3|2020-01-02| 0|
| 1|2020-01-04| 14| | 1|2020-01-03| 15|
| 2|2020-01-05| 6| | 2|2020-01-03| 0|
+---------+----------+-------+ | 3|2020-01-03| 0|
| 3|2020-01-04| 0|
| 2|2020-01-04| 3|
| 1|2020-01-04| 14|
| 2|2020-01-05| 6|
| 1|2020-01-05| 0|
| 3|2020-01-05| 0|
+---------+----------+-------+
我使用了这个堆栈溢出主题,它非常有用:填补timeseries spark中的空白
这是我的代码(只对一个实体进行过滤),它是用python编写的,但我认为在scala中api是一样的:
(
df
.withColumn("date", sf.to_date("created_at"))
.groupBy(
sf.col("entity_id"),
sf.col("date")
)
.agg(sf.count(sf.lit(1)).alias("counter"))
.filter(sf.col("entity_id") == 1)
.select(
sf.col("date"),
sf.col("counter")
)
.join(
spark
.range(
df # range start
.filter(sf.col("entity_id") == 1)
.select(sf.unix_timestamp(sf.min("created_at")).alias("min"))
.first().min // a * a, # a = 60 * 60 * 24 = seconds in one day
(df # range end
.filter(sf.col("entity_id") == 1)
.select(sf.unix_timestamp(sf.max("created_at")).alias("max"))
.first().max // a + 1) * a,
a # range step, a = 60 * 60 * 24 = seconds in one day
)
.select(sf.to_date(sf.from_unixtime("id")).alias("date")),
["date"], # column which will be used for the join
how="right" # type of join
)
.withColumn("counter", sf.when(sf.isnull("counter"), 0).otherwise(sf.col("counter")))
.sort(sf.col("date"))
.show(200)
)
这项工作做得很好,但现在我想避免 filter
做一个范围来填补每个实体的时间序列空白( entity_id == 2
, entity_id == 3
, ...). 供您参考,取决于 entity_id
值,列的最小值和最大值 date
可以不同,但是如果您的帮助涉及整个Dataframe的全局最小值和最大值,我也可以。
如果您需要任何其他信息,请随时询问。
编辑:添加我想要的数据示例
1条答案
按热度按时间92vpleto1#
在创建日期范围的元素时,我宁愿使用pandas函数而不是spark range,因为spark range函数在处理日期值时有一些缺点。不同日期的数量通常很小。即使处理的时间跨度为多年,不同日期的数量也非常少,因此可以很容易地在连接中广播。
这给