我试图通过所有的csv文件,我有一个AWS S3桶来抓取这些csv文件中的所有数据,并将它们放入一个dataframe。我提供的代码首先抓取所有的csv文件名,然后抓取每个单独的文件,并通过csv.reader
运行它来抓取数据,然后将它们放入一个列表中,然后从它创建一个dataframe。我的问题是代码跳过了大于100 KB的文件,我的一些文件大于300 KB。我试图抓住每一个有KB数据的文件,然后放入一个 Dataframe 。
这是我的代码:
# Set the S3 bucket and directory path where CSV files are stored
aws_access_key_id ='XXXXXXXXXX'
aws_secret_access_key='XXXXXXXXXXXXXX'
s3_bucket_name = 'arcodp'
folder_name = 'lab_data/'
# Get a list of all CSV files in the S3 bucket directory
s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
paginator = s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=s3_bucket_name, Prefix=folder_name)
csv_files = [obj['Key'] for page in pages for obj in page['Contents'] if obj['Key'].endswith('.csv')]
# Create an empty list to store the dataframes
df_list = []
ARCID_lst = []
# Read each CSV file into a dataframe and append it to the df_list
for file in csv_files:
try:
response = s3.get_object(Bucket=s3_bucket_name, Key=file)
data = response['Body'].read().decode('utf-8')
# Read the CSV file line by line and append each line to a list
rows_list = []
csv_reader = csv.reader(data.splitlines(), delimiter='|', quoting=csv.QUOTE_NONE)
for row in csv_reader:
rows_list.append(row)
df_list.extend(rows_list)
except:
ARCID_no_hit = file.split('/')[1].split('_')[0]
ARCID_lst.append(ARCID_no_hit)
# Convert the list of rows into a pandas dataframe
df_par = pd.DataFrame(df_list)
# Print the first 5 rows of the combined dataframe
df_par[0:10]
是否有一个关键字参数为csv.reader
为了读取更大的文件?我还没有找到任何在线满足此参数.我也尝试使用dask
,但使用此代码,我只得到No such file or directory: '/user/user/documents/extract_data/"1000231"|"None"|"20221130".
文件不在我的本地计算机上,所以不知道为什么会发生这种情况.这里是代码```dask`:
# Set the S3 bucket and directory path where CSV files are stored
aws_access_key_id ='XXXXXXXXXXXXX'
aws_secret_access_key='XXXXXXXXXX'
s3_bucket_name = 'arcodp'
folder_name = 'lab_data/'
# Get a list of all CSV files in the S3 bucket directory
s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
paginator = s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=s3_bucket_name, Prefix=folder_name)
csv_files = [obj['Key'] for page in pages for obj in page['Contents'] if obj['Key'].endswith('.csv')]
# Create an empty list to store the dataframes
df_list = []
ARCID_lst = []
for file in csv_files:
try:
response = s3.get_object(Bucket=s3_bucket_name, Key=file)
data = response['Body'].read().decode('utf-8')
# Create a delayed Dask dataframe for each CSV file
df = delayed(dd.read_csv)(data, sep='|', header=None, blocksize=None, quoting=csv.QUOTE_NONE, engine='c')
df_list.append(df)
except:
ARCID_no_hit = file.split('/')[1].split('_')[0]
ARCID_lst.append(ARCID_no_hit)
# Combine all delayed Dask dataframes into a single Dask dataframe
df_combined = dd.from_delayed(df_list)
# Compute the final pandas dataframe
df_par = df_combined.compute()
# Print the first 5 rows of the combined dataframe
df_par.head()
1条答案
按热度按时间c3frrgcw1#
下面是使用dask完成此操作的代码
就这样