我正在尝试使用spark scala代码从hdfs文件夹加载增量数据。假设我有以下文件夹:
/hadoop/user/src/2021-01-22
/hadoop/user/src/2021-01-23
/hadoop/user/src/2021-01-24
/hadoop/user/src/2021-01-25
/hadoop/user/src/2021-01-26
/hadoop/user/src/2021-01-27
/hadoop/user/src/2021-01-28
/hadoop/user/src/2021-01-29
我在让路 /hadoop/user/src
从spark提交命令,然后写下面的代码
val Temp_path: String = args(1) // hadoop/user/src
val incre_path = ZonedDateTime.now(ZoneId.of("UTC")).minusDays(1)
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
val incre_path_day = formatter format incre_path
val new_path = Temp_path.concat("/")
val path = new_path.concat(incre_path_day)
因此它处理(sysdate-1)文件夹,即今天的日期是 2021-01-29
所以它会处理的 2021-01-28
目录的数据。
有没有什么方法可以修改代码,这样我就可以给出如下路径 hadoop/user/src/2021-01-22
代码将处理数据直到 2021-01-28
(即2021-01-23、2021-01-24、2021-01-25、2021-01-26、2021-01-27、2021-01-28)。
请建议我应该如何修改我的代码。
1条答案
按热度按时间iq3niunx1#
你可以用
listStatus
从hadoop文件系统中列出输入文件夹中的所有文件夹,并对日期部分进行筛选:你也可以通过考试
PathFilter
函数到listStatus
扫描基本文件夹时筛选路径的步骤