如何使用pyspark文件系统读取最近一小时内上传的新文件?

j9per5c4  于 12个月前  发布在  Spark
关注(0)|答案(1)|浏览(118)

我试图读取最新的文件(说在过去一个小时的新文件)在一个目录中可用并加载该数据.我正在尝试与pyspark结构化流.我已经尝试了Spark流的maxFileAge选项,但它仍然是加载目录中的所有文件,无论在选项中指定的时间.

spark.readStream\
.option("maxFileAge", "1h")\
.schema(cust_schema)\
    .csv(upload_path) \
    .withColumn("closing_date", get_date_udf_func(input_file_name()))\
    .writeStream.format('parquet') \
    .trigger(once=True) \
    .option('checkpointLocation', checkpoint_path) \
    .option('path', write_path) \
    .start()

字符串
上面是我尝试的代码,但它会加载所有可用的文件,而不管时间。请指出我在这里做错了什么。

plicqrtu

plicqrtu1#

抱歉,来晚了。
.option("maxFileAge", "1h")仅在创建checkpointLocation之后才有效。
范例:
第一次运行,读取所有内容。第二次运行,检查checkpointLocation是否已创建,然后通过maxFileAge过滤并将这些文件添加到checkpointLocation
你可以用basepath在readstream中只读取一个文件来创建一个checkpointLocation,然后maxage参数将在接下来的批处理中起作用。

spark.readStream\
.option("maxFileAge", "1h")\
.option("basePath", upload_path)\
.schema(cust_schema)\
    .csv(upload_path + "/partition1=xxxx/partition2=yyyy/") \
...

字符串

相关问题