pyspark Autolaoder - filenotification with modifiedBefore

iqih9akk  于 2024-01-06  发布在  Spark
关注(0)|答案(2)|浏览(136)

在当前配置下,evey 1h我们会获得包含新数据的新文件夹。
See image here.
我正在利用文件通知,我不喜欢切换到目录列表。但是,我遇到了一个问题,即最新文件夹中的CSV文件不断更新。当Autoloader尝试读取当时正在更新的CSV文件时,这会导致作业失败。我正在探索排除最新文件夹的方法,并遇到了modifiedBefore参数,但我不确定它是否与FileNotification兼容。

qltillow

qltillow1#

**modifiedBefore**是autoloader中的通用选项,可用于文件通知模式。

您提到文件每小时到达一次,最新的文件更新非常频繁,当您使用autoloader进行增量加载时会导致错误。
为了避免这种情况,您可以提供一个模式匹配所有文件(最新文件除外)的路径,或者使用**modifiedBefore**选项。
在这两种情况下,你应该知道时间戳。
例如,如果你不需要13:00:00之后的数据,你可以使用下面的模式:

  1. "/2023-12-20T(0[0-9]|1[0-2]).[0-9][0-9].[0-9][0-9]Z/"

字符串
有关模式的更多信息,请参阅此documentation
或者,您可以使用**modifiedBefore**选项:

  1. autoloader_config = {
  2. "cloudFiles.format":"csv",
  3. "cloudFiles.useNotifications": "true",
  4. "cloudFiles.resourceGroup":resourcegroup,
  5. "cloudFiles.clientId": client_id,
  6. "cloudFiles.clientSecret": client_secret,
  7. "cloudFiles.tenantId": tenant_id,
  8. "cloudFiles.connectionString":conn_string,
  9. "cloudFiles.subscriptionId": subscription_id,
  10. "cloudFiles.schemaLocation":schema_location,
  11. "header": True,
  12. "modifiedBefore":"2023-12-20 13:00:00.000000 UTC+5:30"
  13. }


如果你想根据最后一小时进行过滤,你可以使用下面的代码来实现:

  1. from datetime import datetime, timedelta
  2. from zoneinfo import ZoneInfo
  3. india_dt = datetime.now(tz=ZoneInfo("Asia/Kolkata"))
  4. filter = india_dt - timedelta(hours=1)


这给出:2023-12-20 11:41:39.862054+05:30
然后,您可以在**modifiedBefore**选项中使用该过滤器。

注意:您需要指定与每小时创建的文件夹名称相匹配的分区。

展开查看全部
pprl5pva

pprl5pva2#

如果你在某个位置写大文件,并且有可能读取这些文件的作业可能会运行一些不完整的文件,那么你就必须在读取这些文件的作业中写一些代码/做一些杂耍。
通常这个问题是通过改变对写文件的作业的理解来解决的,改变它的逻辑,所以写一个新文件是一个两步的过程:
1.将文件写入单独的文件夹in-progress/<final-file-name>
1.将文件从in-progress/<final-file-name>删除/移动到staging/<final-file-name>
在大多数云文件系统中,重命名是一个原子操作,因此您的阅读器将永远无法读取不完整的文件。
如果您的读者正在寻找特定的文件后缀/模式,那么您也可以使用<final-file-name>.in-progress来代替单独的文件夹。逻辑保持不变。

相关问题