我有下面的代码,我希望用于复制数据到达一个文件夹定期:
val streamingQuery = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.schema("`path` STRING, `modificationTime` TIMESTAMP, `length` BIGINT, `content` BINARY")
.option("recursiveFileLookup", "true")
.load("my path here")
.filter(col("modificationTime") > "2023-10-30 07:00:00")
.writeStream
.trigger(Trigger.AvailableNow())
.foreachBatch (my code goes here on how I copy files)
.option("checkpointLocation", "my path here")
.start()
.awaitTermination()
字符串
我的问题是,在我的情况下,检查点实际上会保存什么?从文件夹中读取的所有内容,还是只保存那些按修改时间过滤的文件?我问这个问题的原因是因为我想让spark避免尝试读取文件夹中的所有文件-这就是为什么我想有一个检查点-然而,我不想在修改日期之前复制所有文件,因为它们不需要。
1条答案
按热度按时间ekqde3dh1#
使用这种方法,无论如何都会扫描所有文件,Spark会进行过滤。最好使用Databricks Autoloader(doc)的
modifiedAfter
选项来指定截止时间戳-然后在文件扫描期间进行过滤。P.S.另一件可以帮助提高性能的事情是使用文件通知模式而不是默认的目录列表模式。尽管如此,它可能需要存储帐户的额外权限。