如何在分组数据中插入PySpark Dataframe ?
例如:
我有一个PySpark Dataframe ,其中包含以下列:
+--------+-------------------+--------+
|webID |timestamp |counts |
+--------+-------------------+--------+
|John |2018-02-01 03:00:00|60 |
|John |2018-02-01 03:03:00|66 |
|John |2018-02-01 03:05:00|70 |
|John |2018-02-01 03:08:00|76 |
|Mo |2017-06-04 01:05:00|10 |
|Mo |2017-06-04 01:07:00|20 |
|Mo |2017-06-04 01:10:00|35 |
|Mo |2017-06-04 01:11:00|40 |
+--------+----------------- -+--------+
我需要将John和Mo的计数数据在各自的时间间隔内每分钟插值到一个数据点,我可以接受任何简单的线性插值--但请注意,我的真实的数据是每隔几秒进行一次,我希望插值到每一秒。
所以结果应该是:
+--------+-------------------+--------+
|webID |timestamp |counts |
+--------+-------------------+--------+
|John |2018-02-01 03:00:00|60 |
|John |2018-02-01 03:01:00|62 |
|John |2018-02-01 03:02:00|64 |
|John |2018-02-01 03:03:00|66 |
|John |2018-02-01 03:04:00|68 |
|John |2018-02-01 03:05:00|70 |
|John |2018-02-01 03:06:00|72 |
|John |2018-02-01 03:07:00|74 |
|John |2018-02-01 03:08:00|76 |
|Mo |2017-06-04 01:05:00|10 |
|Mo |2017-06-04 01:06:00|15 |
|Mo |2017-06-04 01:07:00|20 |
|Mo |2017-06-04 01:08:00|25 |
|Mo |2017-06-04 01:09:00|30 |
|Mo |2017-06-04 01:10:00|35 |
|Mo |2017-06-04 01:11:00|40 |
+--------+----------------- -+--------+
新的行需要添加到我的原始 Dataframe 。正在寻找PySpark解决方案。
4条答案
按热度按时间9q78igpj1#
如果你使用Python,最简单的方法就是重用现有的Pandas函数,使用
GROUPED_MAP
udf:应用于您的数据:
它产生
这是在假设单个
webID
的输入数据和插值数据都适合单个节点的内存的情况下进行的(通常,其他精确的非迭代解决方案必须做出类似的假设)。如果不是这种情况,则可以通过采用重叠窗口轻松地进行近似以及聚合最终结果
这当然是昂贵得多的(有两次混洗,并且一些值将被计算多次),但是如果重叠不足够大以包括下一个观测,则也会留下间隙。
pkwftd7m2#
解决此问题的本地pyspark实现(无udf)是:
我最近写了一篇博文,解释了这种方法,并展示了这种方法与上面提到的Pandasudf方法相比,在大数据集上的伸缩性要好得多:https://medium.com/delaware-pro/interpolate-big-data-time-series-in-native-pyspark-d270d4b592a1
qybjjes13#
这不是一个
Python
解决方案,但我认为下面的Scala
解决方案可以在Python
中使用类似的方法实现,它涉及使用lag
Window函数在每行中创建一个时间范围,并使用UDF通过java.time
API将时间范围扩展为per-minute
时间序列和插值计数的列表,然后使用Spark的explode
方法将其展平:kx5bkwkv4#
我扩展了@大卫的伟大答案,并使其动态化,以便在更广泛的 Dataframe 上使用,并使其可重用。
它将组和时间列名作为输入变量:
然后动态地检测 Dataframe 中存在哪些其他列,然后动态地创建选择表达式和大卫提出的插值表达式:
通过使用
*
-〉*col_create_expr
和*interpol_expr
对列表进行解包,可以将表达式添加到插值方法中: