postgresql SQLAlchemy 2.0外键和关系问题

bvjveswy  于 2023-06-29  发布在  PostgreSQL
关注(0)|答案(1)|浏览(130)

我正在使用异步postgres会话,遇到了在两个表之间插入和提取数据的问题。
customers表是一个独立的表,与其他表没有关系。该表由另一个插入独立更新。
对于quotes表,我需要能够插入带有相应客户ID(唯一客户编号或customer表中的id列)的报价。当选择数据时,我想联接到customers表,并像下面这样将数据拉回嵌套。

class Customers(Base):
    __tablename__ = "customers"
    __table_args__ = (UniqueConstraint("unique_account_number"),)
    id = mapped_column(Integer, primary_key=True)
    customer_name: Mapped[str]
    unique_account_number: Mapped[str]
    updated_datetime = mapped_column(
        DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now
    )
    created_datetime = mapped_column(DateTime, default=datetime.datetime.now)

class Quotes(Base):
    __tablename__ = "quotes"

    id = mapped_column(Integer, primary_key=True)
    origin: Mapped[str]
    destination: Mapped[str]
    customer = relationship(Customers, foreign_key[Customers.unique_account_number])
    updated_datetime = mapped_column(
        DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now
    )
    created_datetime = mapped_column(DateTime, default=datetime.datetime.now)

需要将JSON插入引号表

{
  "origin": "Ney York City",
  "destination": "Houston",
  "unique_account_number": "A9457HDA"
}

quotes表上的select语句中应包含json。

{
  "origin": "string",
  "destination": "string",
  "customer": { 
    "unique_account_number": "ABCD1234",
    "customer_name": "Customer LLC"

}

我如何通过使用ORM来实现这一点?

ds97pgxw

ds97pgxw1#

import sys
import asyncio
import datetime
import json

from sqlalchemy import MetaData, select, UniqueConstraint, Integer, DateTime
from sqlalchemy.sql import update, func

from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped, relationship, joinedload
from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, AsyncSession, create_async_engine

class Base(AsyncAttrs, DeclarativeBase):
    pass

class Customers(Base):
    __tablename__ = "customers"
    __table_args__ = (UniqueConstraint("unique_account_number"),)
    id = mapped_column(Integer, primary_key=True)
    customer_name: Mapped[str]
    unique_account_number: Mapped[str]
    updated_datetime = mapped_column(
        DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now
    )
    created_datetime = mapped_column(DateTime, default=datetime.datetime.now)

class Quotes(Base):
    __tablename__ = "quotes"

    id = mapped_column(Integer, primary_key=True)
    origin: Mapped[str]
    destination: Mapped[str]
    unique_account_number: Mapped[str]
    customer = relationship(Customers,
                            primaryjoin="Quotes.unique_account_number==Customers.unique_account_number",
                            uselist=False,
                            foreign_keys=[Customers.unique_account_number])
    updated_datetime = mapped_column(
        DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now
    )
    created_datetime = mapped_column(DateTime, default=datetime.datetime.now)


async def insert_objects(async_session: async_sessionmaker[AsyncSession]) -> None:
    async with async_session() as session:
        async with session.begin():
            session.add(Quotes(
                origin="Ney York City",
                destination="Houston",
                unique_account_number="A9457HDA"))
            session.add(Customers(
                customer_name="Customer LLC",
                unique_account_number="A9457HDA"))

async def select_objects(async_session: async_sessionmaker[AsyncSession]) -> None:
    async with async_session() as session:
        async with session.begin():
            stmt = select(Quotes).options(joinedload(Quotes.customer))
            quotes = (await session.execute(stmt)).unique().scalars().all()
            return quotes

def get_engine():
    username, password, db = sys.argv[1:4]
    return create_async_engine(
        f"postgresql+asyncpg://{username}:{password}@/{db}",
        echo=True,
    )

def serialize_quote(quote):
    customer_dict = {attr: getattr(quote.customer, attr) for attr in ["unique_account_number", "customer_name"]}
    quote_dict = {attr: getattr(quote, attr) for attr in ["origin", "destination"]}
    quote_dict["customer"] = customer_dict
    return quote_dict

async def async_main() -> None:

    engine = get_engine()

    async_session = async_sessionmaker(engine, expire_on_commit=False)

    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    await insert_objects(async_session)

    quotes = (await select_objects(async_session))
    print ([json.dumps(serialize_quote(quote)) for quote in quotes])

    await engine.dispose()

asyncio.run(async_main())

``

相关问题