我正在尝试建立一个fastapi服务器,它将接收一些生物数据作为输入,并对它们运行一些处理。由于处理占用了服务器的所有资源,因此应该按顺序处理查询。但是,服务器应该保持响应,并在缓冲区中添加更多请求。我一直在尝试使用backgroundtasks模块进行此操作,但是在发送第二个查询后,在任务运行时响应会延迟。感谢您的帮助,并提前表示感谢。
import os
import sys
import time
from dataclasses import dataclass
from fastapi import FastAPI, Request, BackgroundTasks
EXPERIMENTS_BASE_DIR = "/experiments/"
QUERY_BUFFER = {}
app = FastAPI()
@dataclass
class Query():
query_name: str
query_sequence: str
experiment_id: str = None
status: str = "pending"
def __post_init__(self):
self.experiment_id = str(time.time())
self.experiment_dir = os.path.join(EXPERIMENTS_BASE_DIR, self.experiment_id)
os.makedirs(self.experiment_dir, exist_ok=False)
def run(self):
self.status = "running"
# perform some long task using the query sequence and get a return code #
self.status = "finished"
return 0 # or another code depending on the final output
@app.post("/")
async def root(request: Request, background_tasks: BackgroundTasks):
query_data = await request.body()
query_data = query_data.decode("utf-8")
query_data = dict(str(x).split("=") for x in query_data.split("&"))
query = Query(**query_data)
QUERY_BUFFER[query.experiment_id] = query
background_tasks.add_task(process, query)
return {"Query created": query, "Query ID": query.experiment_id, "Backlog Length": len(QUERY_BUFFER)}
async def process(query):
""" Process query and generate data"""
ret_code = await query.run()
del QUERY_BUFFER[query.experiment_id]
print(f'Query {query.experiment_id} processing finished with return code {ret_code}.')
@app.get("/backlog/")
def return_backlog():
return {f"Currently {len(QUERY_BUFFER)} jobs in the backlog."}
暂无答案!
目前还没有任何答案,快来回答吧!