如何在pyspark中的流查询中生成会话窗口?

kq0g1dla  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(153)

下面的代码在批处理中可以正常工作,但是在使用臭名昭著的AnalysisException的流查询中失败了:
流式 Dataframe /数据集不支持非基于时间的窗口

  1. from pyspark.sql.functions import *
  2. from pyspark.sql.window import Window
  3. temp = [
  4. ('Alice', 1),
  5. ('Alice', 60),
  6. ('Alice', 160),
  7. ('Alice', 1111),
  8. ('Alice', 1111),
  9. ('Alice', 1111),
  10. ('Alice', 1111),
  11. ('Alice', 1111),
  12. ('Alice', 1111),
  13. ('Alice', 1112),
  14. ('Bob', 3),
  15. ('Alice', 2),
  16. ('Bob', 2),
  17. ('Alice', 3),
  18. ('Bob', 1)
  19. ]
  20. temp_df = spark.createDataFrame(temp, ["user", "ev_timestamp"])
  21. maxSessionDuration = 60 * 10 # Max session duration of 10 minutes.
  22. client_fp_time_window = Window.partitionBy("user").orderBy("ev_timestamp")
  23. rowsWithSessionIds = temp_df \
  24. .select("user", "ev_timestamp", lag("ev_timestamp", 1).over(client_fp_time_window).alias("prevEvTimestamp")) \
  25. .select("user", "ev_timestamp",
  26. when(
  27. (col("ev_timestamp").cast('long') - col("prevEvTimestamp").cast('long')) < maxSessionDuration, 0) \
  28. .otherwise(1).alias("isNewSession")
  29. ) \
  30. .select("user", "ev_timestamp", sum("isNewSession").over(client_fp_time_window).alias("sessionId"))
  31. display(rowsWithSessionIds)
  32. sessionsDF = rowsWithSessionIds \
  33. .groupBy("user", "sessionId") \
  34. .agg(min("ev_timestamp").alias("startTime"), max("ev_timestamp").alias("endTime"), count("*").alias("count")) \
  35. .alias('Session')
  36. display(sessionsDF)

字符串
我理解这是因为lag()函数不支持流查询。所以推荐的替代方案是使用mapGroupsWithState()方法,但这仅限于Scala/Java。
如何在Pyspark中实现这一点?或者Pyspark的结构化会话有哪些其他替代方案?
每个批处理所需的输出如下所示:

  1. user sessionId startTime endTime count
  2. Bob 1 1 3 3
  3. Alice 1 1 160 5
  4. Alice 2 1111 1112 7

btxsgosb

btxsgosb1#

由于spark >= 3.2.0F.session_window是可用的,它可以用于流和批处理。输出与您的略有不同,而不是sessionId,我们得到session_window

  1. from pyspark.sql import functions as F
  2. temp = [
  3. ('Alice', 1),
  4. ('Alice', 60),
  5. ('Alice', 160),
  6. ('Alice', 1111),
  7. ('Alice', 1111),
  8. ('Alice', 1111),
  9. ('Alice', 1111),
  10. ('Alice', 1111),
  11. ('Alice', 1111),
  12. ('Alice', 1112),
  13. ('Bob', 3),
  14. ('Alice', 2),
  15. ('Bob', 2),
  16. ('Alice', 3),
  17. ('Bob', 1)
  18. ]
  19. temp_df = spark.createDataFrame(temp, ['user', 'ev_timestamp_sec'])
  20. temp_df = temp_df.withColumn(
  21. 'ev_timestamp', F.timestamp_seconds('ev_timestamp_sec')
  22. )
  23. # For Structured Streaming, we have to set a watermark
  24. temp_df = temp_df.withWatermark('ev_timestamp', '5 minutes')
  25. sess_window = F.session_window(
  26. timeColumn='ev_timestamp', gapDuration='10 minutes'
  27. )
  28. agg_cols = [
  29. F.min('ev_timestamp').alias('startTime'),
  30. F.max('ev_timestamp').alias('endTime'),
  31. F.count('*').alias('count')
  32. ]
  33. sessions_df = temp_df.groupBy('user', sess_window).agg(*agg_cols)
  34. sessions_df.show(3, False)
  35. # +-----+------------------------------------------+-------------------+-------------------+-----+
  36. # |user |session_window |startTime |endTime |count|
  37. # +-----+------------------------------------------+-------------------+-------------------+-----+
  38. # |Alice|{1970-01-01 00:00:01, 1970-01-01 00:12:40}|1970-01-01 00:00:01|1970-01-01 00:02:40|5 |
  39. # |Alice|{1970-01-01 00:18:31, 1970-01-01 00:28:32}|1970-01-01 00:18:31|1970-01-01 00:18:32|7 |
  40. # |Bob |{1970-01-01 00:00:01, 1970-01-01 00:10:03}|1970-01-01 00:00:01|1970-01-01 00:00:03|3 |
  41. # +-----+------------------------------------------+-------------------+-------------------+-----+

字符串

展开查看全部

相关问题