SQL / Pyspark -基于动态时间戳和另一列添加新列

ggazkfy8  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(161)

我有这个数据:

  1. id, name, timestamp
  2. 1, David, 2022/01/01 10:00
  3. 2, David, 2022/01/01 10:30
  4. 3, Diego, 2022/01/01 10:59
  5. 4, David, 2022/01/01 10:59
  6. 5, David, 2022/01/01 11:01
  7. 6, Diego, 2022/01/01 12:00
  8. 7, David, 2022/01/01 12:00
  9. 8, David, 2022/01/01 12:05
  10. 9, Diego, 2022/01/01 12:30

字符串
基本上大卫和迭戈在玩一个游戏。他们在那些时间戳上不时地按一个按钮。
游戏可以在他们第一次按下按钮后继续一个小时。之后计数将重置,如果他们再次按下按钮,它将计为他们再次开始玩。
所以我想标记为0(开始),当他们第一次使用按钮在一个小时内,并与1(播放),如果他们在该小时内。
所以在我的情况下,我会从结果中排除这个:

  1. id, name, timestamp, status
  2. 1, David, 2022/01/01 10:00, 0 <--- David starts playing
  3. 2, David, 2022/01/01 10:30, 1 <--- David keeps playing the game that he started at the id 1
  4. 3, Diego, 2022/01/01 10:59, 0 <--- Diego starts playing
  5. 4, David, 2022/01/01 10:59, 1 <--- David keeps playing the game that he started at the id 1
  6. 5, David, 2022/01/01 11:01, 0 <--- David starts playing again
  7. 6, Diego, 2022/01/01 12:00, 0 <--- Diego starts playing again
  8. 7, David, 2022/01/01 12:00, 1 <--- David keeps playing the game that he started at the id 5
  9. 8, David, 2022/01/01 12:05, 0 <--- David start playing again
  10. 9, Diego, 2022/01/01 12:05, 1 <--- Diego keeps playing the game that he started at the id 6


我需要在pyspark中进行这种转换,只是为了标记我认为是start playingkeep playing的东西。
你能帮我做一个SQL查询吗?我最近可以把它调整到pyspark。
它不需要在一个查询/步骤中完成。

lrl1mhuk

lrl1mhuk1#

这不是一个完整的解决方案,但有任何想法,我已经尝试过这样的

  1. from pyspark.sql.functions import explode
  2. from datetime import datetime
  3. from pyspark.sql.types import *
  4. schema = StructType([StructField('id', StringType(), True),
  5. StructField('name', StringType(), True),
  6. StructField('timestamp', TimestampType(), True)])
  7. df = spark.createDataFrame(
  8. [
  9. ("1", "David", datetime.strptime("2022/01/01 10:00", '%Y/%m/%d %H:%M')),
  10. ("2", "David", datetime.strptime("2022/01/01 10:30",'%Y/%m/%d %H:%M')),
  11. ("3", "Diego", datetime.strptime("2022/01/01 10:59",'%Y/%m/%d %H:%M')),
  12. ("4", "David", datetime.strptime("2022/01/01 10:59", '%Y/%m/%d %H:%M')),
  13. ("5", "David", datetime.strptime("2022/01/01 11:01", '%Y/%m/%d %H:%M')),
  14. ("6", "Diego", datetime.strptime("2022/01/01 12:00", '%Y/%m/%d %H:%M')),
  15. ("7", "David", datetime.strptime("2022/01/01 12:00", '%Y/%m/%d %H:%M')),
  16. ("8", "David", datetime.strptime("2022/01/01 12:05", '%Y/%m/%d %H:%M')),
  17. ("9", "Diego", datetime.strptime("2022/01/01 12:30", '%Y/%m/%d %H:%M')),
  18. ],
  19. schema=schema)
  20. df.createOrReplaceTempView("people")
  21. df3=spark.sql("select *,dense_rank()over(partition by hour(timestamp) order by name,timestamp )%2 as t4, case when dense_rank()over(partition by hour(timestamp) order by name,timestamp )%2>0 then dense_rank()over(partition by hour(timestamp) order by name,timestamp )%2-1 else \
  22. dense_rank()over(partition by hour(timestamp) order by name,timestamp )%2+1 end t3 from people order by timestamp,name")
  23. df3.show()

字符串

展开查看全部

相关问题