Apache Spark 大量读取S3存储桶中的许多密钥

h5qlskok  于 2023-03-30  发布在  Apache
关注(0)|答案(1)|浏览(180)

我正在尝试读取和处理AWS S3存储桶/前缀s3://sorel-20m/09-DEC-2020/binaries/中的密钥。这是用于cyber security machine learning的公共存储桶。
这个前缀中有超过1300万个二进制文件。
以下是数据中的示例键:

09-DEC-2020/binaries/0000029bfead495a003e43a7ab8406c6209ffb7d5e59dd212607aa358bfd66ea
09-DEC-2020/binaries/000003b99c3d4b9860ad0b0ca43450603e5322f2cca3c9b3d543a2d6440305a0
09-DEC-2020/binaries/00000533148c26bcc09ab44b1acafe32dde93773d4a7e3dbd06c8232db5e437f
...
09-DEC-2020/binaries/fffffac77abc5f22baefd850a753b0e32a8c106f983f84f6b83fb20df465c7ab
09-DEC-2020/binaries/fffffd86f00a5b4547d3b99963cae39781fa015b3f869b3e232858dd6011d062
09-DEC-2020/binaries/fffffee23b47f84cfdf25c43af7707c8ffa94a974e5af9659e3ed67e2e30b80b

仅使用AWS CLI命令(如aws s3 ls)列出文件就需要数小时。
我尝试使用CLI excludeinclude参数进行过滤:
aws s3 cp s3://sorel-20m/09-DEC-2020/binaries/ . --recursive --dryrun --exclude '*' --include '0000029*'
这很快就返回了数据,但没有完成。看起来CLI是按字母顺序阅读键的,因为当我搜索字母排序末尾的键时(以'fff'开头),下面的命令需要很长时间才能返回数据:
aws s3 cp s3://sorel-20m/09-DEC-2020/binaries/ . --recursive --dryrun --exclude '*' --include 'fff*'
我还尝试了以下AWS Glue(类似于Spark)脚本。这在1小时后超时:

from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

df = spark.read.format("binaryFile").option('pathGlobFilter', '0000029*').option("wholeFile","true").load("s3://sorel-20m/09-DEC-2020/binaries")
#print(df.count())

df.select('Path').write.csv('s3://my-bucket')
job.commit()

如果我知道我感兴趣的每个键的第一个字符的最小值和最大值,我就可以对每个https://stackoverflow.com/a/52450389/11262633使用boto3

bucket.object.filter(Prefix=f'09-DEC-2020/binaries/{first_char}

从上面的数据可以看出,前缀09-DEC-2020/binaries/的第一个字符的范围是从0f
因此,我可以启动16个并行进程,每个进程对应0f之间的一个字符:

import boto3
import sys

session = boto3.Session() 
s3 = session.resource('s3')
bucket = s3.Bucket('sorel-20m')

# Assume this script is called with an argument between `0` and `f`
first_char = sys.args[0] 

prefix = f'09-DEC-2020/binaries/{first_char}' 

current_objects = bucket.objects.filter(Prefix=prefix)

...

所有的处理都是在EC2上完成的,所以我的本地计算机的网络带宽在这里不是瓶颈。
我的问题是
你会推荐这种方法吗?如果是这样,我如何确保所有键都以0f之间的字符开头?

zqdjd7g9

zqdjd7g91#

S3用于枚举bucket内容的底层API ListObjectsV2最多返回1000个项目,并且要对结果进行分页,则需要以前调用的opaque值。这意味着从遗传上讲,不可能并行获取多个页面的结果。但是,当您将对象命名为可预测的模式时,如您所建议的,您可以并行请求多个部分。
我唯一的建议是将你请求的部分进一步分割,而不仅仅是第一个字符,以允许超过16个可能的worker。此外,检查预期字符之前和之后的字符,以验证没有预期字符之外的对象。
把它们放在一起看起来像这样:

import boto3
import ctypes
import multiprocessing

def validate_hex_digits(s3, bucket, prefix):
    # Make sure the first character under the prefix is some hex digit
    resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix, MaxKeys=1)
    assert('0' <= resp['Contents'][0]['Key'][len(prefix)] <= 'f')
    # Make sure there's nothing after 'f'
    resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix, MaxKeys=1, StartAfter=prefix+'g')
    assert('Contents' not in resp)

def worker(queue, queue_out, jobs):
    # A single worker, pull a job from 'queue', send results to 'queue_out'
    # Use jobs value to track the number of jobs in flight
    s3 = boto3.client('s3')
    while True:
        with jobs.get_lock():
            if jobs.value == 0:
                # Nothing left to do
                queue_out.put(None)
                break
            jobs.value -= 1
            bucket, prefix, token = queue.get()
        # Build up args for the call to list_objects
        args = {
            "Bucket": bucket,
            "Prefix": prefix,
        }
        if token is not None:
            args["ContinuationToken"] = token
        resp = s3.list_objects_v2(**args)
        if 'Contents' in resp:
            queue_out.put(resp['Contents'])
        if 'NextContinuationToken' in resp:
            # There's another page for this prefix, send it off
            # for the next available worker to pick up
            with jobs.get_lock():
                queue.put((bucket, prefix, resp['NextContinuationToken']))
                jobs.value += 1

def main():
    bucket = 'sorel-20m'
    prefix = '09-DEC-2020/binaries/'

    s3 = boto3.client('s3')

    # Verify all of the objects are at least two digit hex digits under the prefix
    validate_hex_digits(s3, bucket, prefix)
    for i in range(16):
        validate_hex_digits(s3, bucket, prefix + f"{i:x}")

    # If we get here, all the keys follow the pattern we expect for at 
    # least two digits.  Go ahead and use multi processing to pull down 
    # the list of objects as fast as possible

    # A queue to store work items
    queue = multiprocessing.Queue()
    # A queue to get pages of results
    queue_out = multiprocessing.Queue()
    # How many jobs are left to process?
    jobs = multiprocessing.Value(ctypes.c_int, 0)
    # Place some seeds in the queue for the first two hex characters
    for i in range(256):
        queue.put((bucket, prefix + f"{i:02x}", None))
        jobs.value += 1
    # Create and start some worker threads, two per process 
    # to allow for network wait time
    workers = multiprocessing.cpu_count() * 2
    procs = []
    for _ in range(workers):
        proc = multiprocessing.Process(target=worker, args=(queue, queue_out, jobs))
        proc.start()
        procs.append(proc)

    # While the workers are working, pull down pages and do something with them
    while workers > 0:
        result = queue_out.get()
        if result is None:
            # Signal that a worker finished
            workers -= 1
        else:
            for cur in result:
                # Just show the results like the AWS CLI does
                print(f"{cur['LastModified'].strftime('%Y-%m-%d %H:%M:%S')} {cur['Size']:10d} {cur['Key'][len(prefix):]}")

    # Clean up
    for proc in procs:
        proc.join()

if __name__ == "__main__":
    main()

在我的机器上,这需要大约5分钟来枚举这个存储桶中的对象,而AWS CLI做同样的事情需要将近一个小时。应该注意的是,结果将以任意顺序排列。如果这是一个问题,则需要进行一些枚举后的排序。除非存储桶所有者启用并发布S3 Inventory Report,你可能无法更快地获得对象列表。

相关问题