在pyspark中只读非合并文件

ee7vknir  于 2023-02-13  发布在  Apache
关注(0)|答案(1)|浏览(122)

我在N个文件夹(例如/user/deltas/1/delta1.csv、/user/deltas/2/delta 2csv、.../user/deltas/n/deltaN.csv)中有N个增量所有增量都有相同的列,只是列中的信息不同。
我有一个从文件夹“增量”阅读我的csv文件的代码

dfTable = spark.read.format("csv").option("recursiveFileLookup","true")\
.option("header", "true).load("/home/user/deltas/")

我将使用deltaTable.merge来合并和更新来自增量的信息,并将更新后的信息写入表中(main_table.csv)例如,明天i将具有带有另一更新信息的新增量,我将再次运行我的代码来刷新main_table.csv中的数据。如何避免deltaTable.merge之前已经使用过的增量被添加到文件main_table.csv中?是否有可能改变文件类型后,三角洲的运行,例如Parquet,这是如何避免再次重复使用三角洲?因为我阅读csv文件,而不是Parquet,或类似的日志文件等。

jei2mxaa

jei2mxaa1#

我认为时间路径过滤器可以很好地用于您的用例,如果您每天都运行代码(手动或通过作业),那么您可以使用modifiedAfter参数只加载1天后修改的文件(或者无论您重新运行代码的频率如何)。

from datetime import datetime, timedelta

timestamp_last_run = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%dT-%H:%M:%S")

dfTable = spark.read.format("csv").option("recursiveFileLookup","true")\
.option("header", "true).load("/home/user/deltas/", modifiedAfter=timestamp_last_run)

## ...perform merge operation and save data in main_table.csv

相关问题