使用sparkscala处理具有特定日期范围的目录中的数据

lymgl2op  于 2021-07-15  发布在  Hadoop
关注(0)|答案(1)|浏览(332)

我正在尝试使用spark scala代码从hdfs文件夹加载增量数据。假设我有以下文件夹:

/hadoop/user/src/2021-01-22
/hadoop/user/src/2021-01-23
/hadoop/user/src/2021-01-24
/hadoop/user/src/2021-01-25
/hadoop/user/src/2021-01-26
/hadoop/user/src/2021-01-27
/hadoop/user/src/2021-01-28
/hadoop/user/src/2021-01-29

我在让路 /hadoop/user/src 从spark提交命令,然后写下面的代码

val Temp_path: String = args(1) // hadoop/user/src
val incre_path = ZonedDateTime.now(ZoneId.of("UTC")).minusDays(1)
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
val incre_path_day = formatter format incre_path
val new_path = Temp_path.concat("/")
val path = new_path.concat(incre_path_day)

因此它处理(sysdate-1)文件夹,即今天的日期是 2021-01-29 所以它会处理的 2021-01-28 目录的数据。
有没有什么方法可以修改代码,这样我就可以给出如下路径 hadoop/user/src/2021-01-22 代码将处理数据直到 2021-01-28 (即2021-01-23、2021-01-24、2021-01-25、2021-01-26、2021-01-27、2021-01-28)。
请建议我应该如何修改我的代码。

iq3niunx

iq3niunx1#

你可以用 listStatus 从hadoop文件系统中列出输入文件夹中的所有文件夹,并对日期部分进行筛选:

import org.apache.hadoop.fs.Path
import java.time.{ZonedDateTime, ZoneId}
import java.time.format.DateTimeFormatter

val inputPath = "hadoop/user/src/2021-01-22"

val startDate = inputPath.substring(inputPath.lastIndexOf("/") + 1)
val endDate = DateTimeFormatter.ofPattern("yyyy-MM-dd").format(ZonedDateTime.now(ZoneId.of("UTC")).minusDays(1))

val baseFolder = new Path(inputPath.substring(0, inputPath.lastIndexOf("/") + 1))

val files = baseFolder.getFileSystem(sc.hadoopConfiguration).listStatus(baseFolder).map(_.getPath.toString)
val filtredFiles = files.filter(path => path.split("/").last > startDate &&  path.split("/").last < endDate)

// finally load only the folders you want
val df = spark.read.csv(filtredFiles: _*)

你也可以通过考试 PathFilter 函数到 listStatus 扫描基本文件夹时筛选路径的步骤

相关问题