SQL Server 使用sqlAlchemy的存储过程

pokxtpni  于 2022-12-17  发布在  其他
关注(0)|答案(8)|浏览(255)

如何用sqlAlchemy调用sql server的存储过程?

7gcisfzg

7gcisfzg1#

引擎和连接具有可用于任意SQL语句的execute()方法,会话也是如此。例如:

results = sess.execute('myproc ?, ?', [param1, param2])

如果需要,可以使用outparam()创建输出参数(或者对于绑定参数,使用bindparam()isoutparam=True选项)

oxf4rvwz

oxf4rvwz2#

  • context*:我使用flak-sqlalchemy和MySQL,但没有ORMMap。通常,我用途:
# in the init method
_db = SqlAlchemy(app)

#... somewhere in my code ...
_db.session.execute(query)

不支持调用现成的存储过程:callproc不是通用的,而是特定于mysql连接器的。
对于没有out参数的存储过程,可以执行如下查询

_db.session.execute(sqlalchemy.text("CALL my_proc(:param)"), param='something')

像往常一样。当你有out params时事情会变得更复杂...
使用out参数的一种方法是通过engine.raw_connection()访问底层连接器。

conn = _db.engine.raw_connection()
# do the call. The actual parameter does not matter, could be ['lala'] as well
results = conn.cursor().callproc('my_proc_with_one_out_param', [0])
conn.close()   # commit
print(results) # will print (<out param result>)

这很好,因为我们可以访问out参数,但是这个连接不是由flask会话管理的。这意味着它不会像其他托管查询一样被提交/中止...(只有当您的过程有副作用时才有问题)。
最后,我终于做到了:

# do the call and store the result in a local mysql variabl
# the name does not matter, as long as it is prefixed by @
_db.session.execute('CALL my_proc_with_one_out_param(@out)')
# do another query to get back the result
result = _db.session.execute('SELECT @out').fetchone()

result将是具有一个值的元组:out param。这不是理想的,但危险性最小:如果在会话期间另一个查询失败,则过程调用也将被中止(回滚)。

r8uurelv

r8uurelv3#

仅执行使用func创建的程序对象:

from sqlalchemy import create_engine, func
from sqlalchemy.orm import sessionmaker

engine = create_engine('sqlite://', echo=True)
print engine.execute(func.upper('abc')).scalar() # Using engine
session = sessionmaker(bind=engine)()
print session.execute(func.upper('abc')).scalar() # Using session
fafcakar

fafcakar4#

在MySQL中使用SQLAlchemy调用存储过程的最简单方法是使用Engine.raw_connection()callproc方法。call_proc需要调用存储过程所需的过程名和参数。

def call_procedure(function_name, params):
       connection = cloudsql.Engine.raw_connection()
       try:
           cursor = connection.cursor()
           cursor.callproc(function_name, params)
           results = list(cursor.fetchall())
           cursor.close()
           connection.commit()
           return results
       finally:
           connection.close()
bnl4lu3b

bnl4lu3b5#

假设您已经使用sessionmaker()创建了会话,您可以使用以下函数:

def exec_procedure(session, proc_name, params):
    sql_params = ",".join(["@{0}={1}".format(name, value) for name, value in params.items()])
    sql_string = """
        DECLARE @return_value int;
        EXEC    @return_value = [dbo].[{proc_name}] {params};
        SELECT 'Return Value' = @return_value;
    """.format(proc_name=proc_name, params=sql_params)

    return session.execute(sql_string).fetchall()

现在,您可以使用如下所示的参数执行存储过程'MyProc':

params = {
    'Foo': foo_value,
    'Bar': bar_value
}
exec_procedure(session, 'MyProc', params)
8qgya5xd

8qgya5xd6#

出于对我的一个项目的迫切需要,我编写了一个处理存储过程调用的函数。
给你:

import sqlalchemy as sql

def execute_db_store_procedure(database, types, sql_store_procedure, *sp_args):
    """ Execute the store procedure and return the response table.

    Attention: No injection checking!!!

    Does work with the CALL syntax as of yet (TODO: other databases).

    Attributes:
        database            -- the database
        types               -- tuple of strings of SQLAlchemy type names.
                               Each type describes the type of the argument
                               with the same number.
                               List: http://docs.sqlalchemy.org/en/rel_0_7/core/types.html
        sql_store_procudure -- string of the stored procedure to be executed
        sp_args             -- arguments passed to the stored procedure
    """
    if not len(types) == len(sp_args):
        raise ValueError("types tuple must be the length of the sp args.")

    # Construch the type list for the given types
    # See
    # http://docs.sqlalchemy.org/en/latest/core/sqlelement.html?highlight=expression.text#sqlalchemy.sql.expression.text
    # sp_args (and their types) are numbered from 0 to len(sp_args)-1
    type_list = [sql.sql.expression.bindparam(
                    str(no), type_=getattr(sql.types, typ)())
                        for no, typ in zip(range(len(types)), types)]

    try:
        # Adapts to the number of arguments given to the function
        sp_call = sql.text("CALL `%s`(%s)" % (
                sql_store_procedure,
                ", ".join([":%s" % n for n in range(len(sp_args))])),
            bindparams=type_list
        )
        #raise ValueError("%s\n%s" % (sp_call, type_list))
        with database.engine.begin() as connection:
            return connection.execute(
                sp_call,
                # Don't do this at home, kids...
                **dict((str(no), arg)
                    for (no, arg) in zip(range(len(sp_args)), sp_args)))
    except sql.exc.DatabaseError:
        raise

它使用的是CALL语法,所以MySQL应该能像预期的那样工作。我猜MSSQL使用EXEC而不是call,语法也有一点不同。所以让它与服务器无关取决于你,但应该不难。

ozxc1zmp

ozxc1zmp7#

另一种解决方案:

query = f'call Procedure ("{@param1}", "{@param2}", "{@param3}")'    
sqlEngine = sqlalchemy.create_engine(jdbc)
conn = sqlEngine.connect() 
df = pd.read_sql(query,conn,index_col=None)
2sbarzqh

2sbarzqh8#

我有一个postgresql的存储过程,签名如下-

CREATE OR REPLACE PROCEDURE inc_run_count(
        _host text,
        _org text,
        _repo text,
        _rule_ids text[]
    )

在经历了许多错误和尝试之后,我发现这就是如何从python3调用该过程。

def update_db_rule_count(rule_ids: List[str], host: str, org: str, repo: str):
        param_dict = {"host": host, "org": org, "repo": repo, "rule_ids": f'{{ {",".join(rule_ids)} }}'}

        with AnalyticsSession() as analytics_db:
            analytics_db.execute('call inc_run_count(:host, :org, :repo, :rule_ids)', param_dict)
            analytics_db.commit()

相关问题