填补不同实体的时间序列spark中的空白

cgvd09ve  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(336)

我有一个数据框架,其中包含与各种实体相关的日常事件。我想填补那些时间序列中的空白。
这是我拥有的聚合数据(左侧),右侧是我想要拥有的数据:

+---------+----------+-------+               +---------+----------+-------+
|entity_id|      date|counter|               |entity_id|      date|counter|
+---------+----------+-------+               +---------+----------+-------+
|        3|2020-01-01|      7|               |        3|2020-01-01|      7|
|        1|2020-01-01|     10|               |        1|2020-01-01|     10|
|        2|2020-01-01|      3|               |        2|2020-01-01|      3|
|        2|2020-01-02|      9|               |        2|2020-01-02|      9|
|        1|2020-01-03|     15|               |        1|2020-01-02|      0|
|        2|2020-01-04|      3|               |        3|2020-01-02|      0|
|        1|2020-01-04|     14|               |        1|2020-01-03|     15|
|        2|2020-01-05|      6|               |        2|2020-01-03|      0|
+---------+----------+-------+               |        3|2020-01-03|      0|
                                             |        3|2020-01-04|      0|
                                             |        2|2020-01-04|      3|
                                             |        1|2020-01-04|     14|
                                             |        2|2020-01-05|      6|
                                             |        1|2020-01-05|      0|
                                             |        3|2020-01-05|      0|
                                             +---------+----------+-------+

我使用了这个堆栈溢出主题,它非常有用:填补timeseries spark中的空白
这是我的代码(只对一个实体进行过滤),它是用python编写的,但我认为在scala中api是一样的:

(
    df
    .withColumn("date", sf.to_date("created_at"))
    .groupBy(
        sf.col("entity_id"),
        sf.col("date")
    )
    .agg(sf.count(sf.lit(1)).alias("counter"))
    .filter(sf.col("entity_id") == 1)
    .select(
        sf.col("date"),
        sf.col("counter")
    )
    .join(
        spark
        .range(
            df # range start
            .filter(sf.col("entity_id") == 1)
            .select(sf.unix_timestamp(sf.min("created_at")).alias("min"))
            .first().min // a * a, # a = 60 * 60 * 24 = seconds in one day

            (df # range end
            .filter(sf.col("entity_id") == 1)
            .select(sf.unix_timestamp(sf.max("created_at")).alias("max"))
            .first().max // a + 1) * a,

            a # range step, a = 60 * 60 * 24 = seconds in one day
        )
        .select(sf.to_date(sf.from_unixtime("id")).alias("date")),
        ["date"], # column which will be used for the join
        how="right" # type of join
    )
    .withColumn("counter", sf.when(sf.isnull("counter"), 0).otherwise(sf.col("counter")))
    .sort(sf.col("date"))
    .show(200)
)

这项工作做得很好,但现在我想避免 filter 做一个范围来填补每个实体的时间序列空白( entity_id == 2 , entity_id == 3 , ...). 供您参考,取决于 entity_id 值,列的最小值和最大值 date 可以不同,但是如果您的帮助涉及整个Dataframe的全局最小值和最大值,我也可以。
如果您需要任何其他信息,请随时询问。
编辑:添加我想要的数据示例

92vpleto

92vpleto1#

在创建日期范围的元素时,我宁愿使用pandas函数而不是spark range,因为spark range函数在处理日期值时有一些缺点。不同日期的数量通常很小。即使处理的时间跨度为多年,不同日期的数量也非常少,因此可以很容易地在连接中广播。


# get the minimun and maximun date and collect it to the driver

min_date, max_date = df.select(F.min("date"), F.max("date")).first()

# use Pandas to create all dates and switch back to PySpark DataFrame

from pandas import pandas as pd
timerange = pd.date_range(start=min_date, end=max_date, freq='1d')
all_dates = spark.createDataFrame(timerange.to_frame(),['date'])

# get all combinations of dates and entity_ids

all_dates_and_ids = all_dates.crossJoin(df.select("entity_id").distinct())

# create the final result by doing a left join and filling null values with 0

result = all_dates_and_ids.join(df, on=['date', 'entity_id'], how="left_outer")\
    .fillna({'counter':'0'}) \
    .orderBy(['date', 'entity_id'])

这给

+-------------------+---------+-------+
|               date|entity_id|counter|
+-------------------+---------+-------+
|2020-01-01 00:00:00|        1|     10|
|2020-01-01 00:00:00|        2|      3|
|2020-01-01 00:00:00|        3|      7|
|2020-01-02 00:00:00|        1|      0|
|2020-01-02 00:00:00|        2|      9|
|2020-01-02 00:00:00|        3|      0|
|2020-01-03 00:00:00|        1|     15|
|2020-01-03 00:00:00|        2|      0|
|2020-01-03 00:00:00|        3|      0|
|2020-01-04 00:00:00|        1|     14|
|2020-01-04 00:00:00|        2|      3|
|2020-01-04 00:00:00|        3|      0|
|2020-01-05 00:00:00|        1|      0|
|2020-01-05 00:00:00|        2|      6|
|2020-01-05 00:00:00|        3|      0|
+-------------------+---------+-------+

相关问题