bounty将在3天后过期。回答此问题可获得+50声望奖励。dstromberg希望引起更多人关注此问题。
我正在尝试将一个有效的、大型的、复杂的SQL查询转换为SQLAlchemy的ORM。
下面是一个小的示例程序,它演示了我所看到的问题:
#!/usr/bin/env python3
"""
An SSCCE.
Environment variables:
DBU Your database user
DBP Your database password
DBH Your database host
IDB Your initial database
"""
import os
import pprint
from sqlalchemy import create_engine, select
from sqlalchemy.orm import aliased, sessionmaker, declarative_base
from sqlalchemy.sql.expression import func
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
Base = declarative_base()
class NV(Base):
__tablename__ = "tb_nv"
__bind_key__ = "testdb"
__table_args__ = (
{
"mysql_engine": "InnoDB",
"mysql_charset": "utf8",
"mysql_collate": "utf8_general_ci",
},
)
id = db.Column("id", db.Integer, primary_key=True, autoincrement=True)
builds = db.relationship("Bld", primaryjoin="(NV.id == Bld.variant_id)")
class Vers(Base):
__tablename__ = "tb_vers"
__bind_key__ = "testdb"
__table_args__ = (
{
"mysql_engine": "InnoDB",
"mysql_charset": "utf8",
"mysql_collate": "utf8_general_ci",
},
)
id = db.Column("id", db.Integer, primary_key=True, autoincrement=True)
class St(Base):
__tablename__ = "tb_brst"
__bind_key__ = "testdb"
__table_args__ = ({"mysql_engine": "InnoDB", "mysql_charset": "utf8"},)
id = db.Column("id", db.Integer, primary_key=True, autoincrement=True)
version_id = db.Column(
"version_id",
db.Integer,
db.ForeignKey(
"tb_vers.id",
name="fk_tb_brst_version_id",
onupdate="CASCADE",
ondelete="RESTRICT",
),
nullable=False,
)
branch_id = db.Column(
"branch_id",
db.Integer,
db.ForeignKey(
"tb_br.id",
name="fk_tb_brst_branch_id",
onupdate="CASCADE",
ondelete="RESTRICT",
),
nullable=False,
)
build_id = db.Column(
"build_id",
db.Integer,
db.ForeignKey(
"tb_bld.id",
name="fk_tb_brst_build_id",
onupdate="CASCADE",
ondelete="RESTRICT",
),
nullable=False,
)
version = db.relationship(
"Vers", innerjoin=True, primaryjoin="(St.version_id == Vers.id)"
)
branch = db.relationship(
"Br", innerjoin=True, primaryjoin="(St.branch_id == Br.id)"
)
build = db.relationship(
"Bld", innerjoin=True, primaryjoin="(St.build_id == Bld.id)"
)
class Br(Base):
__tablename__ = "tb_br"
__bind_key__ = "testdb"
__table_args__ = (
{
"mysql_engine": "InnoDB",
"mysql_charset": "utf8",
"mysql_collate": "utf8_general_ci",
},
)
id = db.Column("id", db.Integer, primary_key=True, autoincrement=True)
name = db.Column("name", db.String(45), nullable=False)
class Bld(Base):
__tablename__ = "tb_bld"
__bind_key__ = "testdb"
__table_args__ = (
{
"mysql_engine": "InnoDB",
"mysql_charset": "utf8",
"mysql_collate": "utf8_general_ci",
},
)
id = db.Column("id", db.Integer, primary_key=True, autoincrement=True)
name = db.Column("name", db.String(100), nullable=False)
variant_id = db.Column(
"variant_id",
db.Integer,
db.ForeignKey(
"tb_nv.id",
name="fk_tb_bld_variant_id",
onupdate="CASCADE",
ondelete="RESTRICT",
),
nullable=False,
)
variant = db.relationship("NV")
def display(values):
"""Display values in a decent way."""
pprint.pprint(values)
def connect():
"""
Connect to Staging for testing.
This is based on https://medium.com/analytics-vidhya/translating-sql-queries-to-sqlalchemy-orm-a8603085762b
...and ./game-publishing/services/api/deploy/celery/config/staging-base.j2
"""
conn_str = "mysql://{}:{}@{}/{}".format(
os.environ["DBU"],
os.environ["DBP"],
os.environ["DBH"],
os.environ["IDB"],
)
engine = create_engine(conn_str)
session = sessionmaker(bind=engine)
sess = session()
return (engine, sess)
def main():
"""A minimal query that exhibits the problem."""
(engine, session) = connect()
Base.metadata.create_all(engine)
v = aliased(Vers, name="v")
v_2 = aliased(Vers, name="v_2")
nv_4 = aliased(NV, name="nv_4")
bs = aliased(St, name="bs")
bs_2 = aliased(St, name="bs_2")
bs_3 = aliased(St, name="bs_3")
br = aliased(Br, name="br")
q1 = select(nv_4.id, func.min(bs_3.build_id)).select_from(bs, v)
q2 = q1.join(v_2, onclause=(bs.version_id == v_2.id))
q3 = q2.join(bs_2, onclause=(br.id == bs_2.branch_id))
result = session.execute(q3)
display(result.scalars().all())
main()
我得到的例外(如果您没有看到相同的结果)是:
sqlalchemy.exc.InvalidRequestError: Can't determine which FROM clause to join from, there are multiple FROMS which can join to this entity. Please use the .select_from() method to establish an explicit left side, as well as providing an explicit ON clause if not present already to help resolve the ambiguity.
我正在使用:
$ python3 -m pip list -v | grep -i sqlalchemy
Flask-SQLAlchemy 2.5.1 /data/home/dstromberg/.local/lib/python3.10/site-packages pip
SQLAlchemy 1.4.36 /data/home/dstromberg/.local/lib/python3.10/site-packages pip
$ python3 -m pip list -v | grep -i mysql
mysqlclient 2.1.1 /data/home/dstromberg/.local/lib/python3.10/site-packages pip
PyMySQL 0.8.0 /data/home/dstromberg/.local/lib/python3.10/site-packages pip
bash-4.2# mysql --version
mysql Ver 14.14 Distrib 5.7.41, for Linux (x86_64) using EditLine wrapper
我在谷歌上搜索了几个小时,但似乎一无所获。我找到了一些类似问题的解决方案,但它们看起来不太相似,没有什么用处。
有什么建议吗?
谢谢!
1条答案
按热度按时间wydwbb8l1#
在select_from()方法中,尝试更明确地从一个表中选择,然后使用join()方法构建所有关系。
此外,为了帮助人们回答,请分享以下内容:
1.您使用的是哪个版本的Sqlalchemy?
1.哪个数据库?有些方法是特定于数据库的。
我建议你试试这样的方法: