postgresql 异步SQLAlchemy引擎Alembic迁移

oxf4rvwz  于 2024-01-07  发布在  PostgreSQL
关注(0)|答案(1)|浏览(143)

问题陈述

我正在将我的SQLAlchemyPostgreSQL驱动程序转换为xmlc,并且需要使用xmlc引擎执行Alembic迁移。失败。
到目前为止我所做的

  • 查看Alembic文档
  • 寻找其他类似的SO线程
    错误
2023-12-24 14:36:22 Starting entrypoint.sh
2023-12-24 14:36:29 Database is up and running
2023-12-24 14:36:29 Generating migrations
2023-12-24 14:36:38 Generating /app/alembic/versions/9a4735888d4b_initial_migration.py ...  done
2023-12-24 14:36:41 Running migrations
2023-12-24 14:36:45 Migration completed successfully.
2023-12-24 14:36:45 Seeding with test user
2023-12-24 14:36:49 An error occurred while seeding the expressions: AsyncConnection context has not been started and object has not been awaited.
2023-12-24 14:36:50 Inside start_server function
2023-12-24 14:36:50 Starting ngrok
Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml                                
2023-12-24 14:36:22 wait-for-it.sh: waiting 60 seconds for db:5432
2023-12-24 14:36:29 wait-for-it.sh: db:5432 is available after 7 seconds
2023-12-24 14:36:38 INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
2023-12-24 14:36:38 INFO  [alembic.runtime.migration] Will assume transactional DDL.
2023-12-24 14:36:45 INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
2023-12-24 14:36:45 INFO  [alembic.runtime.migration] Will assume transactional DDL.
2023-12-24 14:36:45 INFO  [alembic.runtime.migration] Running upgrade  -> 9a4735888d4b, Initial migration
2023-12-24 14:36:49 /usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py:775: RuntimeWarning: coroutine 'AsyncConnection.close' was never awaited
2023-12-24 14:36:49   conn.close()

字符串

alembic/env.py

from logging.config import fileConfig
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool
from alembic import context
from database.database_config import Base, db_url
from services.utils import logger
import traceback

config = context.config
fileConfig(config.config_file_name)
target_metadata = Base.metadata

if db_url:
    config.set_main_option("sqlalchemy.url", db_url)

def do_run_migrations(connection):
    try:
        context.configure(
            connection=connection,
            target_metadata=target_metadata
        )

        with context.begin_transaction():
            context.run_migrations()
    except Exception as e:
        logger.error(traceback.format_exc())
        raise

async def run_async_migrations():
    connectable = create_async_engine(db_url, poolclass=NullPool)

    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)

    await connectable.dispose()

def run_migrations_online():
    asyncio.run(run_async_migrations())

run_migrations_online()


下面的代码用于设置PMAC引擎

database_config.py

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

from services.utils import logger
import traceback
from config.env_var import *

DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_HOST = os.getenv('DB_HOST')
DB_NAME = os.getenv('DB_NAME')

Base = declarative_base()

db_url = f'postgresql+asyncpg://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}'

try:
    engine = create_async_engine(db_url, echo=True)
except Exception as e:
    logger.info(f"Error creating database engine: {e}")
    logger.info(traceback.format_exc())
    raise

AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

async def get_db():
    db = AsyncSessionLocal()
    try:
        yield db
    except Exception as e:
        logger.info(f"Failed with db_url: {db_url}")
        logger.info(f"Database session error: {e}")
        logger.info(traceback.format_exc())
        raise
    finally:
        await db.close()


最后是我的init_db.py,我在本地开发模式下运行了一次:

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

from database.models import *
from database.enums import *
from database.database_config import Base, engine, db_url

async def create_tables():
    # Use the async engine from your database configuration
    async_engine = create_async_engine(db_url)

    # Asynchronous table creation
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

if __name__ == "__main__":
    asyncio.run(create_tables())


如何正确创建Oracle迁移?

jckbn6z7

jckbn6z71#

在这个问题上有几个误解。
首先,init_db.py是冗余的,因为Alembic负责初始迁移,所以从我的流中删除了它。
然后,我 * 将所有的SQL Alchemy模型和枚举类型导入到我的database_config.py* 中,重新运行脚本,迁移成功。

相关问题