我有计划上载到Azure存储帐户的数据。我的计划是在Synapse Studio中创建一个管道,其中包括一个Apache Notebook(使用PySpark)。主要目标是让Notebook处理数据,然后将其保存到Lake数据库。
数据将按照以下格式上传到Azure存储容器,例如:2022/Week1/Week1.xlsx
和2023/Week10/Week10.xlsx
中的至少一个。最初,我将在存储帐户中存储和处理所有历史数据。之后,数据将被处理并每周添加到湖泊数据库中。现在的问题是,让Azure管道或Notebook仅识别和处理新添加的数据的最有效方法是什么?.
1条答案
按热度按时间oug3syen1#
这里有一个示例模式,您可以使用它来确保不会两次导入同一个文件。
它使用文件夹强制文件的状态。
创建这些文件夹(在ADLS中)
现在,当您的管道执行时,它需要执行以下操作:
1.将所有文件从 Waiting 复制到 Processing
1.处理在 * 正在处理 * 中找到的所有文件
1.如果文件处理成功,则将其移动到 Archive
1.如果文件未成功处理,请将其移动到“Failed(失败)”
其工作方式是,任何要导入的文件都应复制到 Waiting 文件夹中
在执行管道之后,Waiting 文件夹中的每个文件都应该已经被移出,因此当您再次运行管道时,* 不会 * 被再次处理
执行后,存在于这些文件夹中的文件具有以下含义
这些只是建议的文件夹名称-使用任何您喜欢的名称