如何从AWS S3存储桶中获取较大的csv文件?

ghhkc1vu  于 2023-04-09  发布在  其他
关注(0)|答案(1)|浏览(141)

我试图通过所有的csv文件,我有一个AWS S3桶来抓取这些csv文件中的所有数据,并将它们放入一个dataframe。我提供的代码首先抓取所有的csv文件名,然后抓取每个单独的文件,并通过csv.reader运行它来抓取数据,然后将它们放入一个列表中,然后从它创建一个dataframe。我的问题是代码跳过了大于100 KB的文件,我的一些文件大于300 KB。我试图抓住每一个有KB数据的文件,然后放入一个 Dataframe 。
这是我的代码:

# Set the S3 bucket and directory path where CSV files are stored
aws_access_key_id ='XXXXXXXXXX'
aws_secret_access_key='XXXXXXXXXXXXXX'
s3_bucket_name = 'arcodp'
folder_name = 'lab_data/'

# Get a list of all CSV files in the S3 bucket directory
s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)


paginator = s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=s3_bucket_name, Prefix=folder_name)


csv_files = [obj['Key'] for page in pages for obj in page['Contents'] if obj['Key'].endswith('.csv')]

# Create an empty list to store the dataframes
df_list = []
ARCID_lst =  []
# Read each CSV file into a dataframe and append it to the df_list
for file in csv_files:
    try: 
        response = s3.get_object(Bucket=s3_bucket_name, Key=file)
        data = response['Body'].read().decode('utf-8')
    
        # Read the CSV file line by line and append each line to a list
        rows_list = []
        csv_reader = csv.reader(data.splitlines(), delimiter='|', quoting=csv.QUOTE_NONE)
        for row in csv_reader:
            rows_list.append(row)

        df_list.extend(rows_list)
    except:
        ARCID_no_hit = file.split('/')[1].split('_')[0]
        ARCID_lst.append(ARCID_no_hit)

# Convert the list of rows into a pandas dataframe
df_par = pd.DataFrame(df_list)

    # Print the first 5 rows of the combined dataframe
df_par[0:10]

是否有一个关键字参数为csv.reader为了读取更大的文件?我还没有找到任何在线满足此参数.我也尝试使用dask,但使用此代码,我只得到No such file or directory: '/user/user/documents/extract_data/"1000231"|"None"|"20221130".文件不在我的本地计算机上,所以不知道为什么会发生这种情况.这里是代码```dask`:

# Set the S3 bucket and directory path where CSV files are stored
aws_access_key_id ='XXXXXXXXXXXXX'
aws_secret_access_key='XXXXXXXXXX'
s3_bucket_name = 'arcodp'
folder_name = 'lab_data/'

# Get a list of all CSV files in the S3 bucket directory
s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)


paginator = s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=s3_bucket_name, Prefix=folder_name)


csv_files = [obj['Key'] for page in pages for obj in page['Contents'] if obj['Key'].endswith('.csv')]

# Create an empty list to store the dataframes

df_list = []
ARCID_lst =  []
for file in csv_files:
    try:
        response = s3.get_object(Bucket=s3_bucket_name, Key=file)
        data = response['Body'].read().decode('utf-8')
        
        # Create a delayed Dask dataframe for each CSV file
        df = delayed(dd.read_csv)(data, sep='|', header=None, blocksize=None, quoting=csv.QUOTE_NONE, engine='c')
        df_list.append(df)
    except:
        ARCID_no_hit = file.split('/')[1].split('_')[0]
        ARCID_lst.append(ARCID_no_hit)

# Combine all delayed Dask dataframes into a single Dask dataframe
df_combined = dd.from_delayed(df_list)

# Compute the final pandas dataframe
df_par = df_combined.compute()

# Print the first 5 rows of the combined dataframe
df_par.head()
c3frrgcw

c3frrgcw1#

下面是使用dask完成此操作的代码

import dask.dataframe as dd
df = dd.read_csv("s3://{s3_bucket_name}/{folder_name}/*.csv", 
    storage_options=dict(key='XXXXXXXXXXXXX', secret='XXXXXXXXXX',
    sep='|', ...)

就这样

相关问题