我有一个azure函数,它使用python雪花连接器向snowflake发送查询。它打开一个连接,创建一个游标,发送查询,并且不检查以确保使用\u no \u results=true查询成功。当我运行它时,它运行良好。但是,当我使用它一次运行多个查询时,有些查询会随机失败,状态代码为604:查询执行已取消。有没有什么并发的限制?我在文件中找不到任何信息。发送的查询非常简单(截断表x)并且没有超时。
我的代码附在下面。
import logging
import json
import time
import gc
from flatten_json import flatten
import os
import snowflake.connector
import azure.functions as func
def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
# Deserialize request body
req_body = req.get_json()
logging.info('Deserialized input successfully. Request body: ' + json.dumps(req_body))
#Create result JSON to be returned to caller
result = {
"TaskName":req_body['TaskName'],
"Status":"Complete",
"TaskKey": req_body['TaskKey'],
"Query_ID":"",
"Session_ID":""
}
#Create the Snowflake parameters for connection
USER = <sfusername>
PASSWD = <sfpw>
ACCOUNT = <sfAcc>
WAREHOUSE = <sfwh>
DATABASE = <sfdb>
logging.info('Connection string created')
copy_sql_statement = create_sql_statement(req_body)
logging.info('Insert SQL Statement: ' + copy_sql_statement)
logging.info('Attempting to Connect to Snowflake...')
try:
# Try to connect to Snowflake
connection = snowflake.connector.connect(user=USER, password=PASSWD, account=ACCOUNT, warehouse=WAREHOUSE, database=DATABASE)
logging.info('Connection Successful')
except Exception as e:
raise e
logging.info('Try block for query_snowflake started.')
try:
# Call function to execute copy into
output_list = query_snowflake(req_body, connection, copy_sql_statement) #return queryid and sessionid from Snowflake
queryid = output_list[0]
sessionid = output_list[1]
result['Query_ID']= queryid
result['Session_ID']= sessionid
logging.info('Query sent to Snowflake Successfully.')
return func.HttpResponse(json.dumps(result), status_code = 200)
except Exception as e:
result['Status'] = 'Failed'
result['Error_Message'] = str(e)
logging.info('Copy Into function failed. Error: ' + str(e))
return func.HttpResponse(json.dumps(result),
status_code=400)
def create_sql_statement(req_body):
# Replace TaskKey and CDCMin
copy_sql_statement = req_body['InsertSQL'].replace('@TaskKey', req_body['TaskKey']).replace('@CDCMinDate', req_body['CDCMinDate']).replace('@CDCMaxDate', req_body['CDCMaxDate'])
return copy_sql_statement
def query_snowflake(req_body, connection, copy_sql_statement):
try:
# Execute copy into statement
cur = connection.cursor()
sessionid = cur.execute("select current_session()").fetchone()
cur.execute(copy_sql_statement, _no_results=True)
#connection.execute('COMMIT;')
return [cur.sfqid, sessionid[0]] #return queryid and sessionid as list for result body
except Exception as e:
raise e
#finally:
# Close and dispose connection
cur.close()
connection.close()
新代码:
import logging
import json
import time
import gc
from flatten_json import flatten
import os
import snowflake.connector
import azure.functions as func
def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
# Deserialize request body
req_body = req.get_json()
logging.info('Deserialized input successfully. Request body: ' + json.dumps(req_body))
#Create result JSON to be returned to caller
result = {
"TaskName":req_body['TaskName'],
"Status":"Complete",
"TaskKey": req_body['TaskKey'],
"Query_ID":"",
"Session_ID":"",
"Error_Message":""
}
#Create the Snowflake parameters for connection
USER = <sfusername>
PASSWD = <sfpw>
ACCOUNT = <sfAcc>
WAREHOUSE = <sfwh>
DATABASE = <sfdb>
logging.info('Connection string created')
copy_sql_statement = create_sql_statement(req_body)
logging.info('SQL Statement: ' + copy_sql_statement)
logging.info('Attempting to Connect to Snowflake...')
try:
# Try to connect to Snowflake
connection = snowflake.connector.connect(user=USER, password=PASSWD, account=ACCOUNT, warehouse=WAREHOUSE, database=DATABASE)
logging.info('Connection Successful')
except Exception as e:
raise e
logging.info('Try block for send query started.')
try:
# Call function to execute copy into
logging.info('Sending Query to Snowflake...')
output_list = query_snowflake(req_body, connection, copy_sql_statement) #return queryid and sessionid from Snowflake
queryid = output_list[0]
sessionid = output_list[1]
result['Query_ID']= queryid
result['Session_ID']= sessionid
logging.info('Ensuring Query was Sent...')
status_stmt = create_status_statement(queryid, sessionid)
for x in range(1,14): #it will try for 3.5min in case query is pending
time.sleep(5)
returnValues = get_query_status(status_stmt, connection)
# check result, if error code 604 we know the query canceled.
if returnValues[1] == '604':
result['Status'] = 'Failed'
result['Error_Message'] = 'SQL Execution Canceled'
return func.HttpResponse(json.dumps(result), status_code = 400)
# if its anything but pending, we know the query was sent to snowflake
# 2nd Function worries about the result
elif returnValues[0] != 'PENDING':
result['Status'] = returnValues[0]
logging.info('Query sent to Snowflake Successfully.')
return func.HttpResponse(json.dumps(result), status_code = 200)
else:
logging.info('Loop ' + str(x) + ' completed, trying again...')
time.sleep(10)
#if it exits for loop, mark success, let 2nd function surface any failures.
result['Status'] = 'Success'
return func.HttpResponse(json.dumps(result), status_code = 200)
except Exception as e:
result['Status'] = 'Failed'
result['Error_Message'] = str(e)
logging.info('Copy Into function failed. Error: ' + str(e))
return func.HttpResponse(json.dumps(result),
status_code=400)
def create_sql_statement(req_body):
# Replace TaskKey and CDCMin
copy_sql_statement = req_body['InsertSQL'].replace('@TaskKey', req_body['TaskKey']).replace('@CDCMinDate', req_body['CDCMinDate']).replace('@CDCMaxDate', req_body['CDCMaxDate'])
return copy_sql_statement
def query_snowflake(req_body, connection, copy_sql_statement):
try:
# Execute copy into statement
cur = connection.cursor()
sessionid = cur.execute("select current_session()").fetchone()
cur.execute(copy_sql_statement, _no_results=True)
# return queryid and sessionid as list for result body
return [cur.sfqid, sessionid[0]]
except Exception as e:
raise e
def create_status_statement(queryid, sessionid):
sql_statement = "SELECT execution_status, error_code, query_id \
FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY_BY_SESSION(SESSION_ID => " + sessionid + ")) \
WHERE QUERY_ID = '" + queryid + "'"
return sql_statement
def get_query_status(etl_sql_statement, conn):
QueryStatus = ''
ErrorCode = ''
QueryID = ''
try:
#Execute sql statement.
cur = conn.cursor()
Result = cur.execute(etl_sql_statement)
row = Result.fetchone()
if row is None:
ErrorCode = 'PENDING'
else:
QueryStatus = str(row[0])
ErrorCode = str(row[1])
QueryID = str(row[2])
except Exception as e:
logging.info('Failed to get query status. Error: ' + str(e))
raise e
finally:
#Close and dispose cursor
cur.close()
return (QueryStatus, ErrorCode, QueryID)
暂无答案!
目前还没有任何答案,快来回答吧!