无法将Athena查询读入pandas数据框

ukdjmx9f  于 2023-05-05  发布在  其他
关注(0)|答案(3)|浏览(188)

我有下面的代码,并希望得到它返回一个dataframe正确。轮询逻辑工作,但数据框似乎没有被创建/返回。现在它在调用时只返回None。

import boto3
import pandas as pd
import io
import re
import time

AK='mykey'
SAK='mysecret'

params = {
    'region': 'us-west-2',
    'database': 'default',
    'bucket': 'my-bucket',
    'path': 'dailyreport',
    'query': 'SELECT * FROM v_daily_report LIMIT 100'
}

session = boto3.Session(aws_access_key_id=AK,aws_secret_access_key=SAK)

# In[32]:

def athena_query(client, params):

    response = client.start_query_execution(
        QueryString=params["query"],
        QueryExecutionContext={
            'Database': params['database']
        },
        ResultConfiguration={
            'OutputLocation': 's3://' + params['bucket'] + '/' + params['path']
        }
    )
    return response

def athena_to_s3(session, params, max_execution = 5):
    client = session.client('athena', region_name=params["region"])
    execution = athena_query(client, params)
    execution_id = execution['QueryExecutionId']
    df = poll_status(execution_id, client)
    return df

def poll_status(_id, client):
    '''
    poll query status
    '''
    result = client.get_query_execution(
        QueryExecutionId = _id
    )

    state = result['QueryExecution']['Status']['State']
    if state == 'SUCCEEDED':
        print(state)
        print(str(result))
        s3_key = 's3://' + params['bucket'] + '/' + params['path']+'/'+ _id + '.csv'
        print(s3_key)
        df = pd.read_csv(s3_key)
        return df
    elif state == 'QUEUED':
        print(state)
        print(str(result))
        time.sleep(1)
        poll_status(_id, client)
    elif state == 'RUNNING':
        print(state)
        print(str(result))
        time.sleep(1)
        poll_status(_id, client)
    elif state == 'FAILED':
        return result
    else:
        print(state)
        raise Exception

df_data = athena_to_s3(session, params)

print(df_data)

我计划将dataframe加载移出轮询函数,但只是想让它像现在这样工作。

bksxznpy

bksxznpy1#

我建议你看看AWS Wrangler,而不是使用传统的boto 3 Athena API。这个更新和更具体的接口可以访问AWS中的所有数据,包括对Athena的查询,并提供更多功能。

import awswrangler as wr

df = wr.pandas.read_sql_query(
    sql="select * from table",
    database="database"
)

感谢@RagePwn的评论,值得检查PyAthena作为boto 3选项的替代方案来查询Athena。

lhcgjxsq

lhcgjxsq2#

如果它返回None,那是因为state == 'FAILED'。您需要调查失败的原因,可能在“StateChangeReason”中。

{
    'QueryExecution': {
        'QueryExecutionId': 'string',
        'Query': 'string',
        'StatementType': 'DDL'|'DML'|'UTILITY',
        'ResultConfiguration': {
            'OutputLocation': 'string',
            'EncryptionConfiguration': {
                'EncryptionOption': 'SSE_S3'|'SSE_KMS'|'CSE_KMS',
                'KmsKey': 'string'
            }
        },
        'QueryExecutionContext': {
            'Database': 'string'
        },
        'Status': {
            'State': 'QUEUED'|'RUNNING'|'SUCCEEDED'|'FAILED'|'CANCELLED',
            'StateChangeReason': 'string',
            'SubmissionDateTime': datetime(2015, 1, 1),
            'CompletionDateTime': datetime(2015, 1, 1)
        },
        'Statistics': {
            'EngineExecutionTimeInMillis': 123,
            'DataScannedInBytes': 123,
            'DataManifestLocation': 'string',
            'TotalExecutionTimeInMillis': 123,
            'QueryQueueTimeInMillis': 123,
            'QueryPlanningTimeInMillis': 123,
            'ServiceProcessingTimeInMillis': 123
        },
        'WorkGroup': 'string'
    }
}
kh212irz

kh212irz3#

只是为了详细说明RagePwn使用PyAthena的答案-这也是我最终所做的。由于某种原因,AwsWrangler卡住了,无法处理从S3返回的JSON。下面是基于PyAthena的PyPi页面的代码片段

import os
from pyathena import connect
from pyathena.util import as_pandas

aws_access_key_id = os.getenv('ATHENA_ACCESS_KEY')
aws_secret_access_key = os.getenv('ATHENA_SECRET_KEY')
region_name = os.getenv('ATHENA_REGION_NAME')
staging_bucket_dir = os.getenv('ATHENA_STAGING_BUCKET')

cursor = connect(aws_access_key_id=aws_access_key_id,
                 aws_secret_access_key=aws_secret_access_key,
                 region_name=region_name,
                 s3_staging_dir=staging_bucket_dir,
                ).cursor()
cursor.execute(sql)
df = as_pandas(cursor)

上面的代码假设您已经定义了以下环境变量:

*ATHENA_ACCESS_KEY:您AWS帐户的AWS访问密钥ID
*ATHENA_SECRET_KEY:AWS密钥
*雅典_地区_名称:AWS区域名称
*ATHENA_STAGING_BUCKET:同一帐户中具有正确访问设置的存储桶(对此的解释不在本答案的范围内)

相关问题