在我的spark流媒体应用程序中,如下所示:
val lines=FlumeUtils.createStream(ssc,"hdp2.domain",22222,StorageLevel.MEMORY_AND_DISK_SER_2)
val words = lines.filter(examtep(_))
words.foreachRDD(exam(_))
//some other code
def exam(rdd:RDD[SparkFlumeEvent]):Unit={
if(rdd.count()>0) {
println("****Something*****")
val newrdd=rdd.map(sfe=>{
val tmp=new String(sfe.event.getBody.array())
tmp
})
newrdd.saveAsTextFile("/user/spark/appoutput/Temperaturetest")
}
}
当我运行这个应用程序时,在 words.foreachRDD()
,每次 exam()
方法执行, newsrdd.saveAsTextFile("/user/...")
将执行,并且dir temperaturetest中的文件将被覆盖,最后,我无法获得完整的数据。
我应该怎么写所有的数据 foreacheRDD()
到hdfs??另外,我的spark版本是1.2.1
1条答案
按热度按时间t1qtbnec1#
在创建Flume流之后,直接使用dstream函数而不是rdd函数。dstream有一个内置的saveastextfiles操作,可以执行您所需的操作。