pyspark 使用Databricks Auto Loader摄取几种类型的CSV

5us2dqdw  于 2023-04-29  发布在  Spark
关注(0)|答案(1)|浏览(135)

我尝试使用Autoloader加载几种类型的csv文件,它目前将我放入的所有csv合并到一个大的Parquet表中,我想要的是为每种模式/csv_file创建Parquet表
当前代码执行以下操作:

#Streaming files/ waiting a file to be dropped
spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("delimiter", "~|~") \
  .option("cloudFiles.inferColumnTypes","true") \
  .option("cloudFiles.schemaLocation", pathCheckpoint) \
  .load(sourcePath) \
  .writeStream \
  .format("delta") \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", pathCheckpoint) \
  .start(pathResult)

我想要的

wko9yo5t

wko9yo5t1#

您可以在Autoloader(databricks documentation)上使用pathGlobFilter选项,为同一源目录中的每个文件创建不同的autoloader流,并过滤要使用的文件名。通过这样做,您最终将为每个表使用不同的检查点,并且您可以让不同的模式工作。
这个解类似于这里指出的How to filter files in Databricks Autoloader stream

相关问题