我使用PostgreSQL和asyncpg。
class DbPoolSingleton:
db_pool: Optional[asyncpg.pool.Pool] = None
@staticmethod
async def create_pool():
config = get_postgres_config()
pool: asyncpg.Pool = await asyncpg.create_pool(
...,
min_size=30,
max_size=40
)
print("Pool created")
return pool
@staticmethod
async def get_pool() -> asyncpg.pool.Pool:
if not DbPoolSingleton.db_pool:
DbPoolSingleton.db_pool = await DbPoolSingleton.create_pool()
return DbPoolSingleton.db_pool
@staticmethod
async def terminate_pool():
(await DbPoolSingleton.get_pool()).terminate()
DbPoolSingleton.db_pool = None
print("Pool terminated")
import asyncio
from helpers.pg_rdb_helper import DbPoolSingleton, PgDb
async def test_synchronous():
conn = await (await DbPoolSingleton.get_pool()).acquire()
db = PgDb(conn)
sql = """samplesql"""
total_start = start = datetime.datetime.now()
for i in range(20):
start = datetime.datetime.now()
rows = await db.select(sql)
end = datetime.datetime.now()
print(f"{i}st query took: ", (end-start).total_seconds())
total_end = datetime.datetime.now()
print(f"total query took: ", (total_end-total_start).total_seconds())
=>**total query taken:2.721282
async def test_asynchronous():
db_pool = await DbPoolSingleton.get_pool()
sql = """samplesql"""
total_start = datetime.datetime.now()
tasks = []
for i in range(20):
db = PgDb(await db_pool.acquire())
task = asyncio.create_task(db.select(sql))
tasks.append(task)
await asyncio.gather(*tasks)
total_end = datetime.datetime.now()
print(f"total query took: ", (total_end-total_start).total_seconds())
=>**总查询时间:2.131297
这里,我有一个简单的多查询调用函数,第一个版本是同步版本,它等待每个查询而不使用asyncio
,第二个版本是使用asyncio.gather
在后台运行这些查询(至少这是我的假设)。
结果是,正如你所看到的,asynchronous version
的结果完全慢于synchronous version
。基本上,我知道在asynchronous version
中,我们有一些开销用于为每个查询从池中获取连接,这导致它有点慢。
那么我们如何修复asynchronous version
以获得asyncpg
和asyncio
的优势呢?
经过我的调查,我有一些修复这个asynchronous version
,但他们的机器人得到了一些错误。
- 异步修复1*
async def test_asynchronous():
db_pool = await DbPoolSingleton.get_pool()
sql = """samplesql"""
total_start = datetime.datetime.now()
tasks = []
async with db_pool.acquire() as conn:
db = PgDb(conn)
for i in range(20):
task = asyncio.create_task(db.select(sql))
tasks.append(task)
await asyncio.gather(*tasks)
total_end = datetime.datetime.now()
print(f"total query took: ", (total_end-total_start).total_seconds())
我得到了这个错误=>
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
基本上,这个修复使多个协程使用相同的连接到数据库,所以我得到了这个错误。.
现在,我放弃了这个问题,请帮我解决一下吧??
我的问题是:那么我们如何修复asynchronous version
以获得asyncpg
和asyncio
的优势。
1条答案
按热度按时间ki0zmccv1#
正如您所说,错误是因为同时执行多个操作。您可以通过在代码中添加“await db.select(sql)”来修复此错误。
它所做的是等待上一个查询完成后再开始新的查询。这样做的问题是总执行时间会增加。
代码: