postgresql 如何在SQLAlchemy 2中添加事务,0 Python

mgdq6dx1  于 2023-04-29  发布在  PostgreSQL
关注(0)|答案(1)|浏览(126)

我有一个这样的函数,我需要添加交易:

async def actualize_objects(objects, object_repository) -> None:
    to_create, to_update = [], []
    await object_repository.archive_all()
    already_have = await object_repository.get_all_ids()
    for object in objects:
        if object.id not in already_have:
            to_create.append(object)
        else:
            to_update.append(object)

    await object_repository.create_all(to_create)
    await object_repository.update_all(to_update)

它接受对象并更新数据库中有关对象的信息。await object_repository.archive_all()步骤只有在您确定所有其他步骤也将执行时才应执行。如何添加事务,以便在其余代码失败时,数据库回滚到其原始状态?
我试图这样做,但它不工作,数据库不回滚。我使用的是SQLAlchemy 2。0,Postgres数据库。

async def actualize_objects(objects, object_repository) -> None:
    await object_repository._session.begin_nested()
    try:
        to_create, to_update = [], []
        await object_repository.archive_all()
        already_have = await object_repository.get_all_ids()
        for object in objects:
            if object.id not in already_have:
                to_create.append(object)
            else:
                to_update.append(object)
        await object_repository.create_all(to_create)
        await object_repository.update_all(to_update)
    except Exception as error:
        print('ошибка')
        await object_repository._session.rollback()
jobtbby3

jobtbby31#

您应该使用async with语句进行事务,这样您可以确保如果一切顺利,它将被提交,否则,它将被回滚。让我们也添加一个raise语句,以防出现需要传播的错误。
编辑:rollback()可能有问题

from sqlalchemy.ext.asyncio import AsyncSession

async def actualize_objects(objects, object_repository) -> None:
    async with object_repository._session.begin():
        try:
            to_create, to_update = [], []
            await object_repository.archive_all()
            already_have = await object_repository.get_all_ids()
            for object in objects:
                if object.id not in already_have:
                    to_create.append(object)
                else:
                    to_update.append(object)
            await object_repository.create_all(to_create)
            await object_repository.update_all(to_update)
        except Exception as error:
            print('Error:', error)
            raise

相关问题