如何使用spark在s3中绑定多个文件

tjvv9vkg  于 2021-06-03  发布在  Hadoop
关注(0)|答案(3)|浏览(305)

我在s3中有2000万个文件,大约8000天。
文件按utc时间戳组织,如下所示: s3://mybucket/path/txt/YYYY/MM/DD/filename.txt.gz . 每个文件都是utf-8文本,包含0(空)到100kb的文本(第95个百分位,尽管有一些文件高达几mbs)。
使用spark和scala(我对这两个都是新手,想学习),我想保存“daily bundle”(其中8000个),每个bundle包含当天找到的任何数量的文件。理想情况下,我想存储原始文件名以及它们的内容。输出应该驻留在s3以及压缩,在一些格式,适合在进一步的Spark步骤和实验输入。
一个想法是将bundle存储为一堆json对象(每行一个,每行一个) '\n' -分离),例如。

{id:"doc0001", meta:{x:"blah", y:"foo", ...}, content:"some long string here"}
{id:"doc0002", meta:{x:"foo", y:"bar", ...}, content: "another long string"}

或者,我可以尝试hadoop sequencefile,但我不知道如何优雅地设置它。
以spark shell为例,我发现读取文件非常容易,例如:

val textFile = sc.textFile("s3n://mybucket/path/txt/1996/04/09/*.txt.gz")
// or even
val textFile = sc.textFile("s3n://mybucket/path/txt/*/*/*/*.txt.gz")
// which will take for ever

但是我如何“截获”阅读器来提供文件名呢?
或者我应该得到一个rdd的所有文件,按天分割,并减少步骤写出来 K=filename, V=fileContent ?

bakd9h0s

bakd9h0s1#

按照你的规模,优雅的解决方案将是一个延伸。
我建议不要使用 sc.textFile("s3n://mybucket/path/txt/*/*/*/*.txt.gz") 因为这需要永远。您可以使用aws distcp或类似的方法将文件移动到hdfs中。一旦进入hdfs,spark就会以任何适合您的方式快速地接收信息。
请注意,大多数这些过程都需要某种类型的文件列表,因此您需要以某种方式生成它。对于20 mil文件,创建文件列表将是一个瓶颈。我建议每次将文件上传到s3时,创建一个附加了文件路径的文件。
同样的输出,放入hdfs,然后移到s3(尽管直接拷贝可能同样有效)。

nkhmeac6

nkhmeac62#

你试过类似sc.wholetextfiles的东西吗?
它创建一个rdd,其中键是文件名,值是整个文件的字节数组。然后可以将其Map,使键为文件日期,然后再Mapgroupbykey?
http://spark.apache.org/docs/latest/programming-guide.html

hjzp0vay

hjzp0vay3#

你可以用这个
首先,您可以获得s3路径的缓冲区/列表:

import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest

def listFiles(s3_bucket:String, base_prefix : String) = {
    var files = new ArrayList[String]

    //S3 Client and List Object Request
    var s3Client = new AmazonS3Client();
    var objectListing: ObjectListing = null;
    var listObjectsRequest = new ListObjectsRequest();

    //Your S3 Bucket
    listObjectsRequest.setBucketName(s3_bucket)

    //Your Folder path or Prefix
    listObjectsRequest.setPrefix(base_prefix)

    //Adding s3:// to the paths and adding to a list
    do {
      objectListing = s3Client.listObjects(listObjectsRequest);
      for (objectSummary <- objectListing.getObjectSummaries().asScala) {
        files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
      }
      listObjectsRequest.setMarker(objectListing.getNextMarker());
    } while (objectListing.isTruncated());

    //Removing Base Directory Name
    files.remove(0)

    //Creating a Scala List for same
    files.asScala
  }

现在将这个list对象传递给下面的代码段,注意:sc是sqlcontext的一个对象

var df: DataFrame = null;
  for (file <- files) {
    val fileDf= sc.textFile(file)
    if (df!= null) {
      df= df.unionAll(fileDf)
    } else {
      df= fileDf
    }
  }

现在你得到了最终的统一rdd,即df
可选,您还可以在一个bigrdd中重新划分它

val files = sc.textFile(filename, 1).repartition(1)

重新分区始终有效:d

相关问题