我正在构建一个spark结构化流应用程序,在这里我正在进行批处理流连接。批处理数据的源会定期更新。
因此,我计划定期对批处理数据进行持久化/非持久化。
下面是我用来持久化和取消持久化批处理数据的示例代码。
流量:
读取批次数据
保留批处理数据
每隔一小时,取消数据持久化,读取批处理数据并再次持久化。
但是,我没有看到每小时都刷新批处理数据。
代码:
var batchDF = handler.readBatchDF(sparkSession)
batchDF.persist(StorageLevel.MEMORY_AND_DISK)
var refreshedTime: Instant = Instant.now()
if (Duration.between(refreshedTime, Instant.now()).getSeconds > refreshTime) {
refreshedTime = Instant.now()
batchDF.unpersist(false)
batchDF = handler.readBatchDF(sparkSession)
.persist(StorageLevel.MEMORY_AND_DISK)
}
在spark结构化流媒体作业中,有没有更好的方法来实现这个场景?
1条答案
按热度按时间e0bqpujr1#
您可以通过使用结构化流提供的流调度功能来实现这一点。
通过创建一个人工的“速率”流定期刷新静态Dataframe,可以触发静态Dataframe的刷新(unpersist->load->persist)。其目的是:
初始加载静态Dataframe并保持
var
定义刷新静态Dataframe的方法使用在所需间隔(例如1小时)触发的“速率”流
读取实际流数据并使用静态Dataframe执行连接操作
在这个速率流中有一个
foreachBatch
调用刷新方法的接收器以下代码在spark 3.0.1、scala 2.12.10和delta 0.7.0中运行良好。
为了获得完整的示例,创建了delta表,并使用新值进行了更新,如下所示: