azure Databricks Autoloader多个文件夹

rsaldnfx  于 2023-08-07  发布在  其他
关注(0)|答案(1)|浏览(125)

我很难理解autoloader如何在adls gen 2中处理多个文件夹,以及我应该如何传递data_source路径。
我有下面的文件夹strcutre,其中数据是加载多个表在evey 15分钟在我的存储帐户。根目录看起来像这样:


在每个文件夹中,我有每个表的实际数据


data_source = f"abfss://name@prodstorage.dfs.core.windows.net/{what should be a as root directory parameter}/{table_name}/*.csv"
source_format = "csv"


# Configure Auto Loader to ingest csv data to a Delta table
df = (
    spark.readStream
    .option("delimiter", ",")
    .option("quote", '"')
    .option("mode", "permissive")
    .option("lineSep", "\r\n")
    .option("multiLine", "true")
    .format("cloudFiles")
    .option("cloudFiles.format", source_format)
    .option("cloudFiles.schemaLocation", checkpoint_directory)
    .option("header", "false")
    .option("escape", '"')
    .schema(schema)
    .load(data_source)

字符串
问题是我应该如何将数据源路径传递给将与自动加载器一起工作的代码?

ldioqlga

ldioqlga1#

Autoloader可以一次处理一个文件,它存储最后处理的文件,它可以做结构流。
Autoloader提供了一个称为云文件的结构化流源,以增量和有效地处理新数据。当它们到达云存储时。

.option("recursiveFileLookup",'True')

字符串
上面的代码将允许您忽略传入 Dataframe 的文件夹级别分区。
递归文件查找是使用如果你有任何类型的分区到 Dataframe 在这种情况下,你可以把它打开或关闭,以利用它。
通过使用云原生组件过滤掉新到达的文件,实现数据加载过程的自动化。

  • 注意:云文件数据读取器是从基于云的文件系统读取数据的组件,例如Amazon s3、ADLS gen 2云通知服务:云通知服务是由基于云的文件系统提供的服务,允许您在文件系统发生更改(或)删除现有文件时接收通知。*

将数据源路径传递给将使用Autoloader的代码

df=(
spark.readStream.format('cloudFiles')
.option('cloudFiles.format',"CSV")
.option('cloudFiles.schemaLocation',Schema_loc)
.option('header',True)
.load(Source_data_loc)
)


例如,我必须文件夹SampleFiles,它由2个Emp表和Dept表组成。


的数据


输出写入target_data_loc

相关问题