以30分钟为间隔分割数据:Pyspark

r8xiu3jd  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(174)

我需要在15分钟的时间间隔基础上拆分我的数据日历时间
例如,数据如下所示

  1. ID | rh_start_time | rh_end_time | total_duration
  2. 5421833835 | 31-12-2023 13:26:53 | 31-12-2023 13:27:03 | 10
  3. 5421833961 | 31-12-2023 13:23:50 | 31-12-2023 13:39:10 | 360

字符串
我想把它分成15分钟的间隔,如下所示

  1. ID | rh_start_time | rh_end_time | total_duration | Interval Start
  2. 5421833835 | 31-12-2023 13:26:53 | 31-12-2023 13:27:03 | 10 | 31-12-2023 13:00:00
  3. 5421833961 | 31-12-2023 13:23:50 | 31-12-2023 13:39:10 | 360 | 31-12-2023 13:00:00
  4. 5421833961 | 31-12-2023 13:23:50 | 31-12-2023 13:39:10 | 360 | 31-12-2023 13:30:00


我尝试使用explode + seq,但它以15分钟的块创建数据(例如2023-12-31 13:26:53,2023 -12-31 13:41:53),但不是在实际的日历中

  1. intervals.withColumn(
  2. "rh_interval_start_ts",
  3. explode(expr("sequence(rh_start_time, rh_end_time, interval 30 minutes)")),
  4. )

6xfqseft

6xfqseft1#

一个解决方案是准备间隔并进行连接:

  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
  3. from pyspark.sql.functions import col, expr, to_timestamp
  4. from datetime import datetime, timedelta
  5. spark = SparkSession.builder.appName("example").getOrCreate()
  6. # Create the DataFrame example:
  7. schema = StructType([
  8. StructField("ID", StringType(), True),
  9. StructField("rh_start_time", TimestampType(), True),
  10. StructField("rh_end_time", TimestampType(), True),
  11. StructField("total_duration", IntegerType(), True)
  12. ])
  13. def to_ts(date):
  14. return datetime.strptime(date, "%d-%m-%Y %H:%M:%S")
  15. data = [
  16. ("5421833835", to_ts("31-12-2023 13:26:53"), to_ts("31-12-2023 13:27:03"), 10),
  17. ("5421833961", to_ts("31-12-2023 13:23:50"), to_ts("31-12-2023 13:39:10"), 360)
  18. ]
  19. data = spark.createDataFrame(data, schema=schema)
  20. data.show()
  21. # Create all dates (if necessary, you can search the min and max in data):
  22. start_date = datetime(2023, 12, 31)
  23. end_date = datetime(2024, 1, 1)
  24. interval = timedelta(minutes=15)
  25. timestamps = [start_date + i * interval for i in range(int((end_date - start_date).total_seconds() // (15 * 60) + 1))]
  26. raw_ts = [(timestamp,) for timestamp in timestamps]
  27. column = "interval_start"
  28. intervals = spark.createDataFrame(raw_ts, [column])
  29. intervals = intervals.withColumn(column, col(column).cast(TimestampType()))
  30. intervals.show()
  31. # Do a join:
  32. result = data.join(intervals, on=(
  33. (intervals["interval_start"] >= data["rh_start_time"] - expr("INTERVAL 15 MINUTES"))
  34. & (intervals["interval_start"] <= data["rh_end_time"])
  35. ))
  36. result.show()

字符串
或者,您可以在开始时间的楼层上进行分解:

  1. from pyspark.sql.functions import col, floor, explode
  2. data.withColumn(
  3. 'start_floor',
  4. (F.floor(F.col('rh_start_time').cast('integer') / (60 * 15)) * (60 * 15)).cast('timestamp')
  5. ).withColumn(
  6. "interval_start",
  7. F.explode(F.expr("sequence(start_floor, rh_end_time, interval 15 minutes)")),
  8. ).show()

展开查看全部

相关问题