我尝试使用Autoloader加载几种类型的csv文件,它目前将我放入的所有csv合并到一个大的Parquet表中,我想要的是为每种模式/csv_file创建Parquet表
当前代码执行以下操作:
#Streaming files/ waiting a file to be dropped
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("delimiter", "~|~") \
.option("cloudFiles.inferColumnTypes","true") \
.option("cloudFiles.schemaLocation", pathCheckpoint) \
.load(sourcePath) \
.writeStream \
.format("delta") \
.option("mergeSchema", "true") \
.option("checkpointLocation", pathCheckpoint) \
.start(pathResult)
我想要的
1条答案
按热度按时间wko9yo5t1#
您可以在Autoloader(databricks documentation)上使用pathGlobFilter选项,为同一源目录中的每个文件创建不同的autoloader流,并过滤要使用的文件名。通过这样做,您最终将为每个表使用不同的检查点,并且您可以让不同的模式工作。
这个解类似于这里指出的How to filter files in Databricks Autoloader stream。