文档说文件事件通知系统不能保证100%交付所有文件,它建议使用回填来保证所有文件最终得到处理。但是不清楚如何使用它以及在代码中在哪里使用它。它应该是spark.readStream还是writeStream的一部分。如果有更多关于它的文件,将不胜感激。
spark.readStream
writeStream
xzlaal3s1#
你可以这样使用cloudFiles.backfillInterval:
cloudFiles.backfillInterval
df = spark.readStream.format("cloudFiles") \ .options(**autoloader_config) \ .options("cloudFiles.backfillInterval", "1 day") \ .load("/mnt/data_path/")
字符串根据文档,它异步检查未处理的文件并处理它们。通过设置间隔,可以控制系统检查未处理文件的频率。输出量:
的数据如果你看到检查站的位置。%fs head dbfs:/checkpointLocation2/offsets/0个
%fs head dbfs:/checkpointLocation2/offsets/0
的这里,基于lastBackfillStartTimeMs和lastBackfillFinishTimeMs,触发发生。你还可以观察到偏移量内有5个文件,这意味着它检查旧文件处理5次。这是当我为1 day设置间隔时,它将每天触发一次。
lastBackfillStartTimeMs
lastBackfillFinishTimeMs
1 day
1条答案
按热度按时间xzlaal3s1#
你可以这样使用
cloudFiles.backfillInterval
:字符串
根据文档,它异步检查未处理的文件并处理它们。
通过设置间隔,可以控制系统检查未处理文件的频率。
输出量:
的数据
如果你看到检查站的位置。
%fs head dbfs:/checkpointLocation2/offsets/0
个的
这里,基于
lastBackfillStartTimeMs
和lastBackfillFinishTimeMs
,触发发生。你还可以观察到偏移量内有5个文件,这意味着它检查旧文件处理5次。这是当我为
1 day
设置间隔时,它将每天触发一次。