我创建了一个spark作业,每天从hdfs读取一个文本文件,并从文本文件的每一行提取唯一的键。每个文本文件中大约有50000个键。相同的数据然后被提取的密钥过滤并保存到hdfs。
我想在hdfs中创建一个目录,其结构如下:hdfs://.../date/key 包含筛选数据的。问题是写入hdfs需要很长时间,因为有太多的键。
现在的写法是:
val inputData = sparkContext.textFile(""hdfs://...", 2)
val keys = extractKey(inputData) //keys is an array of approx 50000 unique strings
val cleanedData = cleanData(inputData) //cleaned data is an RDD of strings
keys.map(key => {
val filteredData = cleanedData.filter(line => line.contains(key))
filteredData.repartition(1).saveAsTextFile("hdfs://.../date/key")
})
有没有办法让这更快?我曾考虑过将数据重新划分为提取的键的数量,但之后我无法以这种格式保存hdfs://.../date/key. 我也尝试过groupbykey,但是我无法保存这些值,因为它们不是rdd。
感谢您的帮助:)
3条答案
按热度按时间yftpprvb1#
您只为输入指定了2个分区,为输出指定了1个分区。这样做的一个影响是严重限制了这些操作的并行性。为什么需要这些?
与其计算50000个过滤过的RDD(速度也很慢),不如直接按键分组?我知道你想把它们输出到不同的目录,但这确实造成了这里的瓶颈。有没有另一种方法可以简单地让您读取(键、值)结果?
voj3qocg2#
我认为这种方法应该类似于通过键spark-one-spark作业写入多个输出。分区号与目录号无关。要实现它,您可能需要用自定义版本覆盖generatefilenameforkeyvalue以保存到不同的目录。
关于可伸缩性,这不是spark的问题,而是hdfs的问题。但不管你怎么实现,只要要求不改变,都是不可避免的。但我认为hdfs可以处理50000个文件处理程序
nfeuvbwi3#