所以,问题是我删除了我的delta湖内容,然后我开始了我的spark工作,随后又重新填充了delta湖。但是,即使在delta lake成功完成更新之后,spark作业仍抛出“delta lake location not found”。但令人惊讶的是,如果我的工作持续3-4个小时,它开始挑选三角洲湖的内容。所以,我最关心的是为什么要花这么多时间才能到达三角洲湖的位置?附加我的代码段以供参考
val StreamingQuery = DF
.writeStream
.foreachBatch
{ (batchDF: DataFrame, batchID: Long) =>
val currentTime = LocalDateTime.now()
if(currentTime.isAfter(lastFetchedTime.plusMinutes(3)))
{
try{
DeltaLakeDF = Singleton.spark.read.format("delta").load("delta_lake_location")
lastFetchedTime = currentTime
}
catch {
case ex: Exception => {
println("error here")
logger.error("Exception found: ", ex)
}
}
}
暂无答案!
目前还没有任何答案,快来回答吧!