我正在使用spark-2.4从hadoop读取文件。要求读取修改时间早于某个给定值的文件。我看到了spark文档中提到的选项 modifiedBefore ,请参考以下spark文档修改时间路径过滤器,但我不确定它是否在spark 2.4中可用,如果不可用,如何实现这一点?
modifiedBefore
xmjla07d1#
选择 modifiedBefore 以及 modifiedAfter 从spark 3+开始提供,只能批量使用,不能流式传输。对于spark 2.4,可以使用hadoop文件系统 globStatus 方法并使用 getModificationTime .下面是一个函数示例,该函数接受路径和阈值,并返回使用阈值筛选的文件路径列表:
modifiedAfter
globStatus
getModificationTime
import org.apache.hadoop.fs.Path def getFilesModifiedBefore(path: Path, modifiedBefore: String) = { val format = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss") val thresHoldTime = format.parse(modifiedBefore).getTime() val files = path.getFileSystem(sc.hadoopConfiguration).globStatus(path) files.filter(_.getModificationTime < thresHoldTime).map(_.getPath.toString) }
然后把它和 spark.read.csv :
spark.read.csv
val df = spark.read.csv(getFilesModifiedBefore(new Path("/mypath"), "2021-03-17T10:46:12"):_*)
1条答案
按热度按时间xmjla07d1#
选择
modifiedBefore
以及modifiedAfter
从spark 3+开始提供,只能批量使用,不能流式传输。对于spark 2.4,可以使用hadoop文件系统globStatus
方法并使用getModificationTime
.下面是一个函数示例,该函数接受路径和阈值,并返回使用阈值筛选的文件路径列表:
然后把它和
spark.read.csv
: