我在s3中有2000万个文件,大约8000天。
文件按utc时间戳组织,如下所示: s3://mybucket/path/txt/YYYY/MM/DD/filename.txt.gz
. 每个文件都是utf-8文本,包含0(空)到100kb的文本(第95个百分位,尽管有一些文件高达几mbs)。
使用spark和scala(我对这两个都是新手,想学习),我想保存“daily bundle”(其中8000个),每个bundle包含当天找到的任何数量的文件。理想情况下,我想存储原始文件名以及它们的内容。输出应该驻留在s3以及压缩,在一些格式,适合在进一步的Spark步骤和实验输入。
一个想法是将bundle存储为一堆json对象(每行一个,每行一个) '\n'
-分离),例如。
{id:"doc0001", meta:{x:"blah", y:"foo", ...}, content:"some long string here"}
{id:"doc0002", meta:{x:"foo", y:"bar", ...}, content: "another long string"}
或者,我可以尝试hadoop sequencefile,但我不知道如何优雅地设置它。
以spark shell为例,我发现读取文件非常容易,例如:
val textFile = sc.textFile("s3n://mybucket/path/txt/1996/04/09/*.txt.gz")
// or even
val textFile = sc.textFile("s3n://mybucket/path/txt/*/*/*/*.txt.gz")
// which will take for ever
但是我如何“截获”阅读器来提供文件名呢?
或者我应该得到一个rdd的所有文件,按天分割,并减少步骤写出来 K=filename, V=fileContent
?
3条答案
按热度按时间bakd9h0s1#
按照你的规模,优雅的解决方案将是一个延伸。
我建议不要使用
sc.textFile("s3n://mybucket/path/txt/*/*/*/*.txt.gz")
因为这需要永远。您可以使用aws distcp或类似的方法将文件移动到hdfs中。一旦进入hdfs,spark就会以任何适合您的方式快速地接收信息。请注意,大多数这些过程都需要某种类型的文件列表,因此您需要以某种方式生成它。对于20 mil文件,创建文件列表将是一个瓶颈。我建议每次将文件上传到s3时,创建一个附加了文件路径的文件。
同样的输出,放入hdfs,然后移到s3(尽管直接拷贝可能同样有效)。
nkhmeac62#
你试过类似sc.wholetextfiles的东西吗?
它创建一个rdd,其中键是文件名,值是整个文件的字节数组。然后可以将其Map,使键为文件日期,然后再Mapgroupbykey?
http://spark.apache.org/docs/latest/programming-guide.html
hjzp0vay3#
你可以用这个
首先,您可以获得s3路径的缓冲区/列表:
现在将这个list对象传递给下面的代码段,注意:sc是sqlcontext的一个对象
现在你得到了最终的统一rdd,即df
可选,您还可以在一个bigrdd中重新划分它
重新分区始终有效:d