hdfs文件在使用spark流时被覆盖,但我不想这样

mepcadol  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(314)

在我的spark流媒体应用程序中,如下所示:

val lines=FlumeUtils.createStream(ssc,"hdp2.domain",22222,StorageLevel.MEMORY_AND_DISK_SER_2)

val words = lines.filter(examtep(_))
words.foreachRDD(exam(_))

//some other code

 def exam(rdd:RDD[SparkFlumeEvent]):Unit={
    if(rdd.count()>0) {
      println("****Something*****")
      val newrdd=rdd.map(sfe=>{
      val tmp=new String(sfe.event.getBody.array())
      tmp
      })
    newrdd.saveAsTextFile("/user/spark/appoutput/Temperaturetest")
    }
}

当我运行这个应用程序时,在 words.foreachRDD() ,每次 exam() 方法执行, newsrdd.saveAsTextFile("/user/...") 将执行,并且dir temperaturetest中的文件将被覆盖,最后,我无法获得完整的数据。
我应该怎么写所有的数据 foreacheRDD() 到hdfs??另外,我的spark版本是1.2.1

t1qtbnec

t1qtbnec1#

在创建Flume流之后,直接使用dstream函数而不是rdd函数。dstream有一个内置的saveastextfiles操作,可以执行您所需的操作。

lines.filter( line =>  //Do your filter )
.map( spf => spf.event.getBody)
.saveAsTextFiles("hdfs://localhost:8020/tmp/output")

相关问题