假设我想运行一个流作业,它每x秒获取一次新数据,并为每个触发器输出新行,而不进行任何聚合。例如:
val query = wordCounts.writeStream
.outputMode("append")
.trigger(Trigger.ProcessingTime("15 seconds"))
.format("console")
.start()
在spark文档中,我看到了以下内容:
对输入的查询将生成“结果表”。每一个触发间隔(比如说,每1秒),新行被追加到输入表,最终更新结果表。每当更新结果表时,我们都希望将更改的结果行写入外部接收器。“
追加模式-只有自上一个触发器以来追加到结果表中的新行才会写入外部存储器。这仅适用于结果表中的现有行预计不会更改的查询。”
我知道在append模式下,只有新行被写入控制台,但是“result table”仍然有以前触发器的历史行,对吗?所以,这个表的大小可能会在很长一段时间内增加。有没有办法从结果表中删除这些旧行,因为它们不需要也不会更改?
我看到了windowing+水印功能,但是它主要用于聚合查询,对吗?
暂无答案!
目前还没有任何答案,快来回答吧!