在我的代码中,有一个特定的要求,由于这一点,我不得不使用分区的流媒体框架。但给我以下错误
流式 Dataframe /数据集不支持非基于时间的窗口
我该如何解决这个问题,有人能帮忙吗?
df = (
spark.readStream.format("delta")
.option("ignoreChanges", "true")
.table(table)
)
window_spec = Window.partitionBy("id1","id2","id3").orderBy("BronzeLoadDateTime")
df = df.withColumn("KEY", when(col("key_indicator") == "KID", col("key_value")).otherwise(None))
df = df.withColumn("KEY", first("KEY", ignorenulls=True).over(window_spec))
return df
字符串
1条答案
按热度按时间pn9klfpd1#
Spark Structured Streaming仅允许基于时间的窗口,不支持使用“
over()
”的传统SQL窗口。Spark Structured Streaming无法基于特定列划分所有数据。但是,这是不可行的,因为流由无限输入数据组成。
另一种方法是在Spark Structured Streaming中使用
groupBy()
。需要注意的是,
groupBy()
是一个有状态的操作,这意味着它不能在追加模式下实现,除非在要对其执行groupBy操作的列列表中包含一个timestamp列。字符串
在上面的例子中,createdAt是一个timestamp类型的列。在这个场景中,我们需要预先在timestamp列上使用withWatermark。
的数据
这是必要的,因为Spark不能无休止地存储状态。
请注意,尽管
groupBy
的工作方式与窗口操作不同,但您可以使用基本的join
或使用mapGroupsWithState
的自定义函数来实现所需的功能。Windows总是需要基于时间的数据,但Spark Structured Streaming不需要。
您可以使用触发器“尽快”设置Spark Structured Streaming作业,然后根据特定的时间窗口对数据进行分组。
了解更多Structured Streaming Programming