pyspark 流 Dataframe /数据集不支持非基于时间的窗口,如何解决这个问题?

pw136qt2  于 2023-11-16  发布在  Spark
关注(0)|答案(1)|浏览(193)

在我的代码中,有一个特定的要求,由于这一点,我不得不使用分区的流媒体框架。但给我以下错误
流式 Dataframe /数据集不支持非基于时间的窗口
我该如何解决这个问题,有人能帮忙吗?

  1. df = (
  2. spark.readStream.format("delta")
  3. .option("ignoreChanges", "true")
  4. .table(table)
  5. )
  6. window_spec = Window.partitionBy("id1","id2","id3").orderBy("BronzeLoadDateTime")
  7. df = df.withColumn("KEY", when(col("key_indicator") == "KID", col("key_value")).otherwise(None))
  8. df = df.withColumn("KEY", first("KEY", ignorenulls=True).over(window_spec))
  9. return df

字符串

pn9klfpd

pn9klfpd1#

Spark Structured Streaming仅允许基于时间的窗口,不支持使用“over()”的传统SQL窗口。
Spark Structured Streaming无法基于特定列划分所有数据。但是,这是不可行的,因为流由无限输入数据组成。
另一种方法是在Spark Structured Streaming中使用groupBy()
需要注意的是,groupBy()是一个有状态的操作,这意味着它不能在追加模式下实现,除非在要对其执行groupBy操作的列列表中包含一个timestamp列。

  1. df_result_stream = df.withWatermark("createdAt", "10 minutes" ) \
  2. .groupBy( F.col('Id'), window(F.col("createdAt"), self.acceptable_time_difference)) \
  3. .agg(F.max(F.col('createdAt')).alias('maxCreatedAt'))

字符串
在上面的例子中,createdAt是一个timestamp类型的列。在这个场景中,我们需要预先在timestamp列上使用withWatermark。


的数据
这是必要的,因为Spark不能无休止地存储状态。
请注意,尽管groupBy的工作方式与窗口操作不同,但您可以使用基本的join或使用mapGroupsWithState的自定义函数来实现所需的功能。
Windows总是需要基于时间的数据,但Spark Structured Streaming不需要。
您可以使用触发器“尽快”设置Spark Structured Streaming作业,然后根据特定的时间窗口对数据进行分组。
了解更多Structured Streaming Programming

展开查看全部

相关问题