我有一个spark1.2.0版本的spark流媒体环境,在那里我从本地文件夹中检索数据,每次我发现一个新文件添加到文件夹中,我都会执行一些转换。
val ssc = new StreamingContext(sc, Seconds(10))
val data = ssc.textFileStream(directory)
为了对数据流数据执行分析,我必须将其转换为一个数组
var arr = new ArrayBuffer[String]();
data.foreachRDD {
arr ++= _.collect()
}
然后我使用获得的数据来提取我想要的信息并将它们保存在hdfs上。
val myRDD = sc.parallelize(arr)
myRDD.saveAsTextFile("hdfs directory....")
因为我真的需要用数组处理数据,所以用数组在hdfs上保存数据是不可能的 DStream.saveAsTextFiles("...")
(这将很好地工作)我必须保存rdd,但在此之前,我终于有了名为part-00000等的空输出文件。。。
带着一个 arr.foreach(println)
我能看到转换的正确结果。
我的怀疑是spark在每一批都试图在相同的文件中写入数据,删除以前写入的内容。我试图保存在一个动态命名的文件夹中,如 myRDD.saveAsTextFile("folder" + System.currentTimeMillis().toString())
但总是只创建一个文件夹,输出文件仍然是空的。
如何在spark流上下文中将rdd写入hdfs?
2条答案
按热度按时间k7fdbhmy1#
你使用的spark流不是设计好的。我建议在您的用例中使用spark,或者修改您的代码,使其以spark的方式工作。将数组收集到驱动程序会破坏使用分布式引擎的目的,并使您的应用程序有效地成为一台机器(两台机器也会导致更多的开销,而不仅仅是在一台机器上处理数据)。
你能用数组做的一切,你都能用spark做。因此,只需在流中运行您的计算,分布在worker上,并使用
DStream.saveAsTextFiles()
. 你可以用foreachRDD
+saveAsParquet(path, overwrite = true)
写入单个文件。eni9jsuy2#
@vzamboni:spark 1.5+dataframes api具有以下功能: