spark流媒体:hdfs

3qpi33ja  于 2021-05-30  发布在  Hadoop
关注(0)|答案(3)|浏览(340)

我无法从hdfs流式传输“旧”文件。
如果我的spark作业由于某种原因(例如demo、deployment)停止,但写入/移动到hdfs目录是连续的,那么我可能会在启动spark流式处理作业后跳过这些文件。

val hdfsDStream = ssc.textFileStream("hdfs://sandbox.hortonworks.com/user/root/logs")

    hdfsDStream.foreachRDD(
      rdd => logInfo("Number of records in this batch: " + rdd.count())
    )

输出-->此批中的记录数:0
spark streaming是否有办法将“已读”文件移动到其他文件夹?或者我们必须手动编程?因此,它将避免读取已经“读取”的文件。
spark流与在cron中运行spark作业(sc.textfile)相同吗?

hrysbysz

hrysbysz1#

val filterF = new Function[Path, Boolean] {
    def apply(x: Path): Boolean = {
      println("looking if "+x+" to be consider or not")
      val flag = if(x.toString.split("/").last.split("_").last.toLong < System.currentTimeMillis){ println("considered "+x); list += x.toString; true}
       else{ false }
      return flag
    }
}

此筛选函数用于确定每个路径是否实际是您首选的路径。所以apply中的函数应该根据您的需求定制。

val streamed_rdd = ssc.fileStream[LongWritable, Text, TextInputFormat]("/user/hdpprod/temp/spark_streaming_output",filterF,false).map{case (x, y) => (y.toString)}

现在您必须将filestream函数的第三个变量设置为false,这不仅是为了确保新文件,而且还要考虑流目录中现有的旧文件。

oxcyiej7

oxcyiej72#

您希望spark读取目录中已有的文件吗?如果是这样的话,这是一个常见的误解,让我大吃一惊。 textFileStream 监视目录中新文件的出现,然后读取它们。当您启动或读取目录中已存在的文件时,它会忽略目录中已存在的文件。
基本原理是,您将有一些进程将文件写入hdfs,然后您将需要spark来读取它们。请注意,这些文件在很大程度上是以原子形式出现的,例如,它们是在其他地方缓慢写入的,然后移动到监视的目录中。这是因为hdfs不能同时正确地处理文件的读写。

xjreopfe

xjreopfe3#

正如dean提到的,textfilestream使用默认的只使用新文件。

def textFileStream(directory: String): DStream[String] = {
    fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
  }

所以,它所做的就是把这个变种叫做 fileStream ```
def fileStream[
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory)
}

再看看 `FileInputDStream` 类,我们将看到它确实可以查找现有文件,但默认为仅新建:

newFilesOnly: Boolean = true,

所以,回到 `StreamingContext` 代码,我们可以看到有和重载,我们可以通过直接调用 `fileStream` 方法:

def fileStream[
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag]
(directory: String, filter: Path => Boolean, newFilesOnly: Boolean):InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
}


### 因此,tl;博士;是

ssc.fileStream[LongWritable, Text, TextInputFormat]
(directory, FileInputDStream.defaultFilter, false).map(_._2.toString)

相关问题