我有一个要添加到apscheduler BlockingScheduler的作业列表,其中ThreadPoolExecutor编号与作业数量相同。
我添加的作业使用sqlalchemy并与同一数据库交互,但我收到错误:sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
我在我的基础sqlalchemy设置中使用了scoped_session和sessionmaker。
from os.path import join, realpath
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker
db_name = environ.get("DB_NAME")
db_path = realpath(join( "data", db_name))
engine = create_engine(f"sqlite:///{db_path}", pool_pre_ping=True)
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)
Base = declarative_base()
然后,我添加到apscheduler的调度作业类的示例如下:
from app.data_structures.base import Base, Session, engine
from app.data_structures.job import Job
from app.data_structures.scheduled_job import ScheduledJob
from app.data_structures.user import User
class AccountingProcessorJob(ScheduledJob):
name: str = "Accounting Processor"
def __init__(self, resources: AppResources, depends: List[str] = None) -> None:
super().__init__(resources)
def job_function(self) -> None:
account_dir = realpath(environ.get("ACCOUNTING_DIRECTORY"))
Base.metadata.create_all(engine, Base.metadata.tables.values(), checkfirst=True)
session = Session()
try:
#do some stuff with the session here e.g.
# with some variables that are setup
user = User(user_name=user_name)
session.add(user)
user.extend(jobs)
session.commit()
except:
session.rollback()
finally:
Session.remove()
我的印象是,使用scoped_session和会话工厂将为每个线程启动一个新会话,并使其成为线程安全的。
其中User和Job是sqlalchemy对象,例如:
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql.expression import false
from app.data_structures.base import Base
from app.data_structures.job import Job
class User(Base):
__tablename__ = "users"
user_name: Mapped[str] = mapped_column(primary_key=True),
employee_number = Column(Integer)
manager = relationship("User", remote_side=[user_name], post_update=True)
jobs: Mapped[list[Job]] = relationship()
def __init__(
self,
user_name: str,
employee_number: int = None,
manager: str = None,
) -> None:
self.user_name = user_name
self.employee_number = employee_number
self.manager = manager
谁能解释一下我做错了什么,以及如何修复它?
1条答案
按热度按时间ffscu2ro1#
此示例使用SQLAlchemy 1.4.48、apscheduler 3.10.1和python模块sqlite3 3.34.1。
我不确定如何完全重现您的工作功能中正在发生的事情,因为不清楚
jobs
或user.extend(jobs)
中的.extend
来自何处。create_all在每个作业中似乎都不能很好地工作(抛出表已经存在的错误),所以我把它放在自己的作业中,只在创建用户的作业之前运行一次。
我使用随机休眠来确保作业间歇运行,而不是在最大线程数的确切块中运行。