雪花python连接器;发出多个请求时出现错误604

wxclj1h5  于 2021-07-24  发布在  Java
关注(0)|答案(0)|浏览(219)

我有一个azure函数,它使用python雪花连接器向snowflake发送查询。它打开一个连接,创建一个游标,发送查询,并且不检查以确保使用\u no \u results=true查询成功。当我运行它时,它运行良好。但是,当我使用它一次运行多个查询时,有些查询会随机失败,状态代码为604:查询执行已取消。有没有什么并发的限制?我在文件中找不到任何信息。发送的查询非常简单(截断表x)并且没有超时。
我的代码附在下面。

import logging
import json
import time
import gc
from flatten_json import flatten
import os
import snowflake.connector
import azure.functions as func

def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')

    # Deserialize request body
    req_body = req.get_json()
    logging.info('Deserialized input successfully. Request body: '  +  json.dumps(req_body))

    #Create result JSON to be returned to caller
    result = {
        "TaskName":req_body['TaskName'],
        "Status":"Complete",
        "TaskKey": req_body['TaskKey'],
        "Query_ID":"",
        "Session_ID":""
        }

    #Create the Snowflake parameters for connection
    USER = <sfusername>
    PASSWD = <sfpw>
    ACCOUNT = <sfAcc>
    WAREHOUSE = <sfwh>
    DATABASE = <sfdb>

    logging.info('Connection string created')

    copy_sql_statement = create_sql_statement(req_body)
    logging.info('Insert SQL Statement: ' + copy_sql_statement)

    logging.info('Attempting to Connect to Snowflake...')    
    try:
        # Try to connect to Snowflake
        connection = snowflake.connector.connect(user=USER, password=PASSWD, account=ACCOUNT, warehouse=WAREHOUSE, database=DATABASE)
        logging.info('Connection Successful')
    except Exception as e:
        raise e

    logging.info('Try block for query_snowflake started.')
    try:
        # Call function to execute copy into
        output_list = query_snowflake(req_body, connection, copy_sql_statement) #return queryid and sessionid from Snowflake
        queryid = output_list[0]
        sessionid = output_list[1]
        result['Query_ID']= queryid
        result['Session_ID']= sessionid
        logging.info('Query sent to Snowflake Successfully.')
        return func.HttpResponse(json.dumps(result), status_code = 200)
    except Exception as e:
        result['Status'] = 'Failed'
        result['Error_Message'] = str(e)
        logging.info('Copy Into function failed.  Error: ' + str(e))
        return func.HttpResponse(json.dumps(result),
            status_code=400)

def create_sql_statement(req_body):
    # Replace TaskKey and CDCMin
    copy_sql_statement = req_body['InsertSQL'].replace('@TaskKey', req_body['TaskKey']).replace('@CDCMinDate', req_body['CDCMinDate']).replace('@CDCMaxDate', req_body['CDCMaxDate'])
    return copy_sql_statement

def query_snowflake(req_body, connection, copy_sql_statement):
    try:
        # Execute copy into statement
        cur = connection.cursor()   
        sessionid = cur.execute("select current_session()").fetchone()
        cur.execute(copy_sql_statement, _no_results=True)
        #connection.execute('COMMIT;')
        return [cur.sfqid, sessionid[0]] #return queryid and sessionid as list for result body
    except Exception as e:
        raise e
    #finally:
        # Close and dispose connection
        cur.close()
        connection.close()

新代码:

import logging
import json
import time
import gc
from flatten_json import flatten
import os
import snowflake.connector
import azure.functions as func

def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')

    # Deserialize request body
    req_body = req.get_json()
    logging.info('Deserialized input successfully. Request body: '  +  json.dumps(req_body))

    #Create result JSON to be returned to caller
    result = {
        "TaskName":req_body['TaskName'],
        "Status":"Complete",
        "TaskKey": req_body['TaskKey'],
        "Query_ID":"",
        "Session_ID":"",
        "Error_Message":""
        }

    #Create the Snowflake parameters for connection
    USER = <sfusername>
    PASSWD = <sfpw>
    ACCOUNT = <sfAcc>
    WAREHOUSE = <sfwh>
    DATABASE = <sfdb>

    logging.info('Connection string created')

    copy_sql_statement = create_sql_statement(req_body)
    logging.info('SQL Statement: ' + copy_sql_statement)

    logging.info('Attempting to Connect to Snowflake...')    
    try:
        # Try to connect to Snowflake
        connection = snowflake.connector.connect(user=USER, password=PASSWD, account=ACCOUNT, warehouse=WAREHOUSE, database=DATABASE)
        logging.info('Connection Successful')
    except Exception as e:
        raise e

    logging.info('Try block for send query started.')
    try:
        # Call function to execute copy into
        logging.info('Sending Query to Snowflake...')
        output_list = query_snowflake(req_body, connection, copy_sql_statement) #return queryid and sessionid from Snowflake
        queryid = output_list[0]
        sessionid = output_list[1]
        result['Query_ID']= queryid
        result['Session_ID']= sessionid

        logging.info('Ensuring Query was Sent...')
        status_stmt = create_status_statement(queryid, sessionid)
        for x in range(1,14): #it will try for 3.5min in case query is pending

            time.sleep(5)
            returnValues = get_query_status(status_stmt, connection) 

            # check result, if error code 604 we know the query canceled.
            if returnValues[1] == '604':
                result['Status'] = 'Failed'
                result['Error_Message'] = 'SQL Execution Canceled'
                return func.HttpResponse(json.dumps(result), status_code = 400)

            # if its anything but pending, we know the query was sent to snowflake
            # 2nd Function worries about the result
            elif returnValues[0] != 'PENDING':
                result['Status'] = returnValues[0]
                logging.info('Query sent to Snowflake Successfully.')
                return func.HttpResponse(json.dumps(result), status_code = 200)

            else:
                logging.info('Loop ' + str(x) + ' completed, trying again...')
                time.sleep(10)

        #if it exits for loop, mark success, let 2nd function surface any failures.
        result['Status'] = 'Success'
        return func.HttpResponse(json.dumps(result), status_code = 200)

    except Exception as e:
        result['Status'] = 'Failed'
        result['Error_Message'] = str(e)
        logging.info('Copy Into function failed.  Error: ' + str(e))
        return func.HttpResponse(json.dumps(result),
            status_code=400)

def create_sql_statement(req_body):
    # Replace TaskKey and CDCMin
    copy_sql_statement = req_body['InsertSQL'].replace('@TaskKey', req_body['TaskKey']).replace('@CDCMinDate', req_body['CDCMinDate']).replace('@CDCMaxDate', req_body['CDCMaxDate'])
    return copy_sql_statement

def query_snowflake(req_body, connection, copy_sql_statement):
    try:
        # Execute copy into statement
        cur = connection.cursor()   
        sessionid = cur.execute("select current_session()").fetchone()
        cur.execute(copy_sql_statement, _no_results=True)
        # return queryid and sessionid as list for result body
        return [cur.sfqid, sessionid[0]]
    except Exception as e:
        raise e

def create_status_statement(queryid, sessionid):

    sql_statement = "SELECT execution_status, error_code, query_id \
            FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY_BY_SESSION(SESSION_ID => " + sessionid + ")) \
            WHERE QUERY_ID = '" + queryid + "'"
    return sql_statement

def get_query_status(etl_sql_statement, conn):

    QueryStatus = ''
    ErrorCode = ''
    QueryID = ''

    try:
        #Execute sql statement. 
        cur = conn.cursor()
        Result = cur.execute(etl_sql_statement)
        row = Result.fetchone()

        if row is None:
            ErrorCode = 'PENDING'
        else:
            QueryStatus = str(row[0])
            ErrorCode = str(row[1])
            QueryID = str(row[2])

    except Exception as e:
        logging.info('Failed to get query status. Error: ' + str(e))
        raise e

    finally:
        #Close and dispose cursor
        cur.close()

    return (QueryStatus, ErrorCode, QueryID)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题