如何同时运行asyncpg(Postgresql)多个查询?

fkvaft9z  于 2023-06-29  发布在  PostgreSQL
关注(0)|答案(1)|浏览(176)

我使用PostgreSQL和asyncpg。

class DbPoolSingleton:
    db_pool: Optional[asyncpg.pool.Pool] = None

    @staticmethod
    async def create_pool():
        config = get_postgres_config()
        pool: asyncpg.Pool = await asyncpg.create_pool(
            ...,
            min_size=30,
            max_size=40
        )
        print("Pool created")
        return pool

    @staticmethod
    async def get_pool() -> asyncpg.pool.Pool:
        if not DbPoolSingleton.db_pool:
            DbPoolSingleton.db_pool = await DbPoolSingleton.create_pool()
        return DbPoolSingleton.db_pool

    @staticmethod
    async def terminate_pool():
        (await DbPoolSingleton.get_pool()).terminate()
        DbPoolSingleton.db_pool = None
        print("Pool terminated")
import asyncio
from helpers.pg_rdb_helper import DbPoolSingleton, PgDb
async def test_synchronous():
    conn = await (await DbPoolSingleton.get_pool()).acquire()
    db = PgDb(conn)
    sql = """samplesql"""
    total_start = start = datetime.datetime.now()
    
    for i in range(20):
        start = datetime.datetime.now()
        rows = await db.select(sql)
        end  = datetime.datetime.now()
        print(f"{i}st query took: ", (end-start).total_seconds())
    total_end = datetime.datetime.now()
    print(f"total query took: ", (total_end-total_start).total_seconds())

=>**total query taken:2.721282

async def test_asynchronous():
    db_pool = await DbPoolSingleton.get_pool()
    sql = """samplesql"""
    total_start = datetime.datetime.now()
    tasks = []
    for i in range(20):
        db = PgDb(await db_pool.acquire())
        task = asyncio.create_task(db.select(sql))
        tasks.append(task)
    await asyncio.gather(*tasks)
    total_end = datetime.datetime.now()
    print(f"total query took: ", (total_end-total_start).total_seconds())

=>**总查询时间:2.131297
这里,我有一个简单的多查询调用函数,第一个版本是同步版本,它等待每个查询而不使用asyncio,第二个版本是使用asyncio.gather在后台运行这些查询(至少这是我的假设)。
结果是,正如你所看到的,asynchronous version的结果完全慢于synchronous version。基本上,我知道在asynchronous version中,我们有一些开销用于为每个查询从池中获取连接,这导致它有点慢。
那么我们如何修复asynchronous version以获得asyncpgasyncio的优势呢?
经过我的调查,我有一些修复这个asynchronous version,但他们的机器人得到了一些错误。

  • 异步修复1*
async def test_asynchronous():
    db_pool = await DbPoolSingleton.get_pool()
    sql = """samplesql"""
    total_start = datetime.datetime.now()
    tasks = []
    async with db_pool.acquire() as conn:
        db = PgDb(conn)
        for i in range(20):
            task = asyncio.create_task(db.select(sql))
            tasks.append(task)
        await asyncio.gather(*tasks)
    total_end = datetime.datetime.now()
    print(f"total query took: ", (total_end-total_start).total_seconds())

我得到了这个错误=>

asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

基本上,这个修复使多个协程使用相同的连接到数据库,所以我得到了这个错误。.

现在,我放弃了这个问题,请帮我解决一下吧??
我的问题是:那么我们如何修复asynchronous version以获得asyncpgasyncio的优势。

ki0zmccv

ki0zmccv1#

正如您所说,错误是因为同时执行多个操作。您可以通过在代码中添加“await db.select(sql)”来修复此错误。
它所做的是等待上一个查询完成后再开始新的查询。这样做的问题是总执行时间会增加

代码:

async with db_pool.acquire() as conn:
     for i in range(20):
         db = PgDb(conn)
         await db.select(sql)

相关问题