scala—在spark流上下文中将rdd写入hdfs

k4aesqcs  于 2021-05-30  发布在  Hadoop
关注(0)|答案(2)|浏览(433)

我有一个spark1.2.0版本的spark流媒体环境,在那里我从本地文件夹中检索数据,每次我发现一个新文件添加到文件夹中,我都会执行一些转换。

val ssc = new StreamingContext(sc, Seconds(10))
val data = ssc.textFileStream(directory)

为了对数据流数据执行分析,我必须将其转换为一个数组

var arr = new ArrayBuffer[String]();
   data.foreachRDD {
   arr ++= _.collect()
}

然后我使用获得的数据来提取我想要的信息并将它们保存在hdfs上。

val myRDD  = sc.parallelize(arr)
myRDD.saveAsTextFile("hdfs directory....")

因为我真的需要用数组处理数据,所以用数组在hdfs上保存数据是不可能的 DStream.saveAsTextFiles("...") (这将很好地工作)我必须保存rdd,但在此之前,我终于有了名为part-00000等的空输出文件。。。
带着一个 arr.foreach(println) 我能看到转换的正确结果。
我的怀疑是spark在每一批都试图在相同的文件中写入数据,删除以前写入的内容。我试图保存在一个动态命名的文件夹中,如 myRDD.saveAsTextFile("folder" + System.currentTimeMillis().toString()) 但总是只创建一个文件夹,输出文件仍然是空的。
如何在spark流上下文中将rdd写入hdfs?

k7fdbhmy

k7fdbhmy1#

你使用的spark流不是设计好的。我建议在您的用例中使用spark,或者修改您的代码,使其以spark的方式工作。将数组收集到驱动程序会破坏使用分布式引擎的目的,并使您的应用程序有效地成为一台机器(两台机器也会导致更多的开销,而不仅仅是在一台机器上处理数据)。
你能用数组做的一切,你都能用spark做。因此,只需在流中运行您的计算,分布在worker上,并使用 DStream.saveAsTextFiles() . 你可以用 foreachRDD + saveAsParquet(path, overwrite = true) 写入单个文件。

eni9jsuy

eni9jsuy2#

@vzamboni:spark 1.5+dataframes api具有以下功能:

dataframe.write().mode(SaveMode.Append).format(FILE_FORMAT).partitionBy("parameter1", "parameter2").save(path);

相关问题