从文件读取数据时触发“modifiedbefore”选项

aij0ehis  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(405)

我正在使用spark-2.4从hadoop读取文件。要求读取修改时间早于某个给定值的文件。
我看到了spark文档中提到的选项 modifiedBefore ,请参考以下spark文档修改时间路径过滤器,但我不确定它是否在spark 2.4中可用,如果不可用,如何实现这一点?

xmjla07d

xmjla07d1#

选择 modifiedBefore 以及 modifiedAfter 从spark 3+开始提供,只能批量使用,不能流式传输。对于spark 2.4,可以使用hadoop文件系统 globStatus 方法并使用 getModificationTime .
下面是一个函数示例,该函数接受路径和阈值,并返回使用阈值筛选的文件路径列表:

import org.apache.hadoop.fs.Path

def getFilesModifiedBefore(path: Path, modifiedBefore: String) = {
  val format = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
  val thresHoldTime = format.parse(modifiedBefore).getTime()

  val files = path.getFileSystem(sc.hadoopConfiguration).globStatus(path)

  files.filter(_.getModificationTime < thresHoldTime).map(_.getPath.toString)

}

然后把它和 spark.read.csv :

val df = spark.read.csv(getFilesModifiedBefore(new Path("/mypath"), "2021-03-17T10:46:12"):_*)

相关问题