我试图读取最新的文件(说在过去一个小时的新文件)在一个目录中可用并加载该数据.我正在尝试与pyspark结构化流.我已经尝试了Spark流的maxFileAge选项,但它仍然是加载目录中的所有文件,无论在选项中指定的时间.
spark.readStream\
.option("maxFileAge", "1h")\
.schema(cust_schema)\
.csv(upload_path) \
.withColumn("closing_date", get_date_udf_func(input_file_name()))\
.writeStream.format('parquet') \
.trigger(once=True) \
.option('checkpointLocation', checkpoint_path) \
.option('path', write_path) \
.start()
字符串
上面是我尝试的代码,但它会加载所有可用的文件,而不管时间。请指出我在这里做错了什么。
1条答案
按热度按时间plicqrtu1#
抱歉,来晚了。
.option("maxFileAge", "1h")
仅在创建checkpointLocation之后才有效。范例:
第一次运行,读取所有内容。第二次运行,检查checkpointLocation是否已创建,然后通过maxFileAge过滤并将这些文件添加到
checkpointLocation
。你可以用basepath在readstream中只读取一个文件来创建一个checkpointLocation,然后maxage参数将在接下来的批处理中起作用。
字符串