我正在用FastAPI在python中做一个服务器,我希望有一个与我的API无关的函数,每5分钟在后台运行一次(比如从API中检查内容,并根据响应打印内容)
我试过创建一个线程来运行start_worker
函数,但是它什么也不打印。
有人知道怎么做吗?
def start_worker():
print('[main]: starting worker...')
my_worker = worker.Worker()
my_worker.working_loop() # this function prints "hello" every 5 seconds
if __name__ == '__main__':
print('[main]: starting...')
uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
_worker_thread = Thread(target=start_worker, daemon=False)
_worker_thread.start()
2条答案
按热度按时间cidc1ykv1#
您应该在调用
uvicorn.run
之前启动Thread,因为uvicorn.run
正在阻塞该线程。PS:在你的问题中你说你希望后台任务每5分钟运行一次,但是在你的代码中你说每5秒运行一次。下面的例子假设你希望是后者。如果你希望它每5分钟运行一次,那么把时间调整为60 * 5**。
您还可以使用FastAPI的启动事件启动线程,只要它可以在应用程序启动之前运行即可。
您可以改为使用重复的Event scheduler执行后台任务,如下所示:
68bkxrlz2#
@Chris的解决方案工作正常!
但是,如果您想改进它并删除冗余的
start_scheduler
方法,只需将sc
参数直接传递给print_event
方法,kwargs
参数如下所示: