我有1,000,000条记录,我正试图输入到数据库中,不幸的是,其中一些记录不符合数据库模式。
在一个记录失败的那一刻我在做:
1.回滚到数据库
1.观察异常
1.解决这个问题
1.再次运行。“
我希望建立一个脚本,它将保存一方所有的“坏”记录,但将提交所有正确的。
当然,我可以一个一个地提交,然后当提交失败时,回滚并提交下一个,但我会付出“运行时代价”,因为代码会运行很长时间。
在下面的例子中,我有两个模型:File
和Client
。一个客户端有许多文件。
在commit.py文件中,我希望一次提交1M个文件对象,或者批量提交(1k)。目前,我只知道当我在最后提交时,什么时候失败了,有没有办法知道哪些对象是“坏的”(例如外键的完整性错误),也就是说,将一侧(在另一个列表中)放置,但提交所有“好的”
非常感谢你的帮助
model.py
个字符
from sqlalchemy import Column, DateTime, String, func, Integer, ForeignKey
from . import base
class Client(base):
__tablename__ = 'clients'
id = Column(String, primary_key=True)
class File(base):
__tablename__ = 'files'
id = Column(Integer, primary_key=True, autoincrement=True)
client_id = Column(String, ForeignKey('clients.id'))
path = Column(String)
#init.py
import os
from dotenv import load_dotenv
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
load_dotenv()
db_name = os.getenv("DB_NAME")
db_host = os.getenv("DB_HOST")
db_port = os.getenv("DB_PORT")
db_user = os.getenv("DB_USER")
db_password = os.getenv("DB_PASSWORD")
db_uri = 'postgresql://' + db_user + ':' + db_password + '@' + db_host + ':' + db_port + '/' + db_name
print(f"product_domain: {db_uri}")
base = declarative_base()
engine = create_engine(db_uri)
base.metadata.bind = engine
Session = sessionmaker(bind=engine)
session = Session()
conn = engine.connect()
#commit.py
from . import session
def commit(list_of_1m_File_objects_records):
#I wish to for loop over the rows and if a specific row raise exception to insert it to a list and handle after wards
for file in list_of_1m_File_objects_records:
session.add(file)
session.commit()
# client:
# id
# "a"
# "b"
# "c"
# file:
# id|client_id|path
# --|---------|-------------
# 1 "a" "path1.txt"
# 2 "aa" "path2.txt"
# 3 "c" "path143.txt"
# 4 "a" "pat.txt"
# 5 "b" "patb.txt"
# wish the file data would enter the database although it has one record "aa" which will raise integrity error
字符串
2条答案
按热度按时间58wvjzkj1#
由于我不能发表评论,我建议使用psycopg2和sqlAlchemy来生成与数据库的连接,然后在查询的末尾使用一个带有“On conflict”的查询来添加和提交数据
4dbbbstv2#
当然,我可以一个一个地提交,然后当提交失败时,回滚并提交下一个,但我会付出“运行时代价”,因为代码会运行很长时间。
这个价格的来源是什么?如果是fsync速度,你可以通过在本地连接上将synchronous_commit设置为off来消除大部分成本。如果你中途崩溃,那么你需要弄清楚一旦恢复,哪些实际上已经被记录下来,这样你就知道在哪里重新启动,但我不认为这很难做到。这种方法应该让你用最少的工作获得最大的收益。
现在我只明白当我最后承诺的时候,
这听起来像是你在使用延迟约束,有什么原因吗?
有没有办法知道哪些对象是“坏”的(例如外键的完整性错误)
对于该示例的情况,在开始之前将所有客户端id读入字典(假设它们适合RAM),然后在python端测试Files,以便您可以在尝试插入它们之前拒绝孤儿。