pyspark-获取1和0序列的第一个值

r7xajy2e  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(519)

我要第一个 indicator 并为每组ID创建一个新的指示符。将有0的长序列,但1序列的第一个1需要有一个称为 first_indicator .

  1. dataframe=spark.createDataFrame([("B2", "2019-11-19 12:07:38", 1), ("B2", "2019-11-19 12:24:25", 1),
  2. ("B2", "2019-11-19 12:37:58", 0), ("B2", "2019-11-19 12:55:08", 1),
  3. ("B2", "2019-11-19 13:07:28", 1), ("B2", "2019-11-19 13:20:28", 0),
  4. ("F9", "2020-02-02 13:06:36", 0), ("F9", "2020-02-02 13:21:37", 1),
  5. ("F9", "2020-02-02 13:36:38", 1), ("F9", "2020-02-02 13:45:32", 0),
  6. ("F9", "2020-02-02 14:06:32", 1), ("F9", "2020-02-02 14:24:31", 1)],
  7. ["id", "date_time", "indicator"]).show()
  8. +---+-------------------+---------+
  9. | id| date_time|indicator|
  10. +---+-------------------+---------+
  11. | B2|2019-11-19 12:07:38| 1|
  12. | B2|2019-11-19 12:24:25| 1|
  13. | B2|2019-11-19 12:37:58| 0|
  14. | B2|2019-11-19 12:55:08| 1|
  15. | B2|2019-11-19 13:07:28| 1|
  16. | B2|2019-11-19 13:20:28| 0|
  17. | F9|2020-02-02 13:06:36| 0|
  18. | F9|2020-02-02 13:21:37| 1|
  19. | F9|2020-02-02 13:36:38| 1|
  20. | F9|2020-02-02 13:45:32| 0|
  21. | F9|2020-02-02 14:06:32| 1|
  22. | F9|2020-02-02 14:24:31| 1|
  23. +---+-------------------+---------+

所需Dataframe:

  1. +---+-------------------+---------+---------------+
  2. | id| date_time|indicator|first_indicator|
  3. +---+-------------------+---------+---------------+
  4. | B2|2019-11-19 12:07:38| 1| 1|
  5. | B2|2019-11-19 12:24:25| 1| 0|
  6. | B2|2019-11-19 12:37:58| 0| 0|
  7. | B2|2019-11-19 12:55:08| 1| 1|
  8. | B2|2019-11-19 13:07:28| 1| 0|
  9. | B2|2019-11-19 13:20:28| 0| 0|
  10. | F9|2020-02-02 13:06:36| 0| 0|
  11. | F9|2020-02-02 13:21:37| 1| 1|
  12. | F9|2020-02-02 13:36:38| 1| 0|
  13. | F9|2020-02-02 13:45:32| 0| 0|
  14. | F9|2020-02-02 14:06:32| 1| 1|
  15. | F9|2020-02-02 14:24:31| 1| 0|
  16. +---+-------------------+---------+---------------+
ifsvaxew

ifsvaxew1#

我建议您按“id”分组,并将“date\u time”和“indicator”收集在一个列表中,这样您就可以得到如下结果:

  1. +---+---------------------------------------------------------+
  2. | id| array |
  3. +---+---------------------------------------------------------+
  4. | B2|[(2019-11-19 12:07:38, 1), (2019-11-19 12:24:25, 1) ... ]|
  5. | F9|[(2020-02-02 13:06:36, 0), (2020-02-02 13:21:37, 0) ... ]|
  6. +---+---------------------------------------------------------+

接下来,您可以构建自己的自定义项,返回第一个指标的记录。在这个udf中,您不需要处理Dataframe,因此要应用的算法考虑起来更“自然”。

y0u0uwnf

y0u0uwnf2#

您可以使用窗口按Dataframe进行分区和排序,然后使用lag函数比较前一个值为0和当前值为1。

  1. w = Window.partitionBy('id').orderBy('date_time')
  2. df.withColumn('target', ((lag('indicator', 1, 0).over(w) == 0) & (col('indicator') == 1)).cast('int')).show()
  3. +---+-------------------+---------+------+
  4. | id| date_time|indicator|target|
  5. +---+-------------------+---------+------+
  6. | B2|2019-11-19 12:07:38| 1| 1|
  7. | B2|2019-11-19 12:24:25| 1| 0|
  8. | B2|2019-11-19 12:37:58| 0| 0|
  9. | B2|2019-11-19 12:55:08| 1| 1|
  10. | B2|2019-11-19 13:07:28| 1| 0|
  11. | B2|2019-11-19 13:20:28| 0| 0|
  12. | F9|2020-02-02 13:06:36| 0| 0|
  13. | F9|2020-02-02 13:21:37| 1| 1|
  14. | F9|2020-02-02 13:36:38| 1| 0|
  15. | F9|2020-02-02 13:45:32| 0| 0|
  16. | F9|2020-02-02 14:06:32| 1| 1|
  17. | F9|2020-02-02 14:24:31| 1| 0|
  18. +---+-------------------+---------+------+
展开查看全部

相关问题