我无法从hdfs流式传输“旧”文件。
如果我的spark作业由于某种原因(例如demo、deployment)停止,但写入/移动到hdfs目录是连续的,那么我可能会在启动spark流式处理作业后跳过这些文件。
val hdfsDStream = ssc.textFileStream("hdfs://sandbox.hortonworks.com/user/root/logs")
hdfsDStream.foreachRDD(
rdd => logInfo("Number of records in this batch: " + rdd.count())
)
输出-->此批中的记录数:0
spark streaming是否有办法将“已读”文件移动到其他文件夹?或者我们必须手动编程?因此,它将避免读取已经“读取”的文件。
spark流与在cron中运行spark作业(sc.textfile)相同吗?
3条答案
按热度按时间hrysbysz1#
此筛选函数用于确定每个路径是否实际是您首选的路径。所以apply中的函数应该根据您的需求定制。
现在您必须将filestream函数的第三个变量设置为false,这不仅是为了确保新文件,而且还要考虑流目录中现有的旧文件。
oxcyiej72#
您希望spark读取目录中已有的文件吗?如果是这样的话,这是一个常见的误解,让我大吃一惊。
textFileStream
监视目录中新文件的出现,然后读取它们。当您启动或读取目录中已存在的文件时,它会忽略目录中已存在的文件。基本原理是,您将有一些进程将文件写入hdfs,然后您将需要spark来读取它们。请注意,这些文件在很大程度上是以原子形式出现的,例如,它们是在其他地方缓慢写入的,然后移动到监视的目录中。这是因为hdfs不能同时正确地处理文件的读写。
xjreopfe3#
正如dean提到的,textfilestream使用默认的只使用新文件。
所以,它所做的就是把这个变种叫做
fileStream
```def fileStream[
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory)
}
newFilesOnly: Boolean = true,
def fileStream[
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag]
(directory: String, filter: Path => Boolean, newFilesOnly: Boolean):InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
}
ssc.fileStream[LongWritable, Text, TextInputFormat]
(directory, FileInputDStream.defaultFilter, false).map(_._2.toString)