我有下面的代码,并希望得到它返回一个dataframe正确。轮询逻辑工作,但数据框似乎没有被创建/返回。现在它在调用时只返回None。
import boto3
import pandas as pd
import io
import re
import time
AK='mykey'
SAK='mysecret'
params = {
'region': 'us-west-2',
'database': 'default',
'bucket': 'my-bucket',
'path': 'dailyreport',
'query': 'SELECT * FROM v_daily_report LIMIT 100'
}
session = boto3.Session(aws_access_key_id=AK,aws_secret_access_key=SAK)
# In[32]:
def athena_query(client, params):
response = client.start_query_execution(
QueryString=params["query"],
QueryExecutionContext={
'Database': params['database']
},
ResultConfiguration={
'OutputLocation': 's3://' + params['bucket'] + '/' + params['path']
}
)
return response
def athena_to_s3(session, params, max_execution = 5):
client = session.client('athena', region_name=params["region"])
execution = athena_query(client, params)
execution_id = execution['QueryExecutionId']
df = poll_status(execution_id, client)
return df
def poll_status(_id, client):
'''
poll query status
'''
result = client.get_query_execution(
QueryExecutionId = _id
)
state = result['QueryExecution']['Status']['State']
if state == 'SUCCEEDED':
print(state)
print(str(result))
s3_key = 's3://' + params['bucket'] + '/' + params['path']+'/'+ _id + '.csv'
print(s3_key)
df = pd.read_csv(s3_key)
return df
elif state == 'QUEUED':
print(state)
print(str(result))
time.sleep(1)
poll_status(_id, client)
elif state == 'RUNNING':
print(state)
print(str(result))
time.sleep(1)
poll_status(_id, client)
elif state == 'FAILED':
return result
else:
print(state)
raise Exception
df_data = athena_to_s3(session, params)
print(df_data)
我计划将dataframe加载移出轮询函数,但只是想让它像现在这样工作。
3条答案
按热度按时间bksxznpy1#
我建议你看看AWS Wrangler,而不是使用传统的boto 3 Athena API。这个更新和更具体的接口可以访问AWS中的所有数据,包括对Athena的查询,并提供更多功能。
感谢@RagePwn的评论,值得检查PyAthena作为boto 3选项的替代方案来查询Athena。
lhcgjxsq2#
如果它返回None,那是因为state == 'FAILED'。您需要调查失败的原因,可能在“StateChangeReason”中。
kh212irz3#
只是为了详细说明RagePwn使用
PyAthena
的答案-这也是我最终所做的。由于某种原因,AwsWrangler
卡住了,无法处理从S3返回的JSON。下面是基于PyAthena的PyPi页面的代码片段上面的代码假设您已经定义了以下环境变量:
*ATHENA_ACCESS_KEY:您AWS帐户的AWS访问密钥ID
*ATHENA_SECRET_KEY:AWS密钥
*雅典_地区_名称:AWS区域名称
*ATHENA_STAGING_BUCKET:同一帐户中具有正确访问设置的存储桶(对此的解释不在本答案的范围内)