如何将fastapi dependents objetcs与asyncio结合使用?

dzhpxtsq  于 2021-07-13  发布在  Java
关注(0)|答案(0)|浏览(343)

我有一个fastapi端点,它需要从hdfs下载一些文件到本地服务器。
我正在尝试使用asyncio来运行将在单独的进程中下载文件的函数。
我正在使用fastapi depends创建hdfs客户机,并在端点执行中注入对象。

from fastapi import Depends, FastAPI, Request, Response, status
from hdfs import InsecureClient
import asyncio
from concurrent.futures.process import ProcessPoolExecutor

app = FastAPI()

HDFS_URLS = ['http://hdfs-srv.local:50070']

async def run_in_process(fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result

def connectHDFS():
    client = InsecureClient(url)
    yield client

def fr(id, img, client):
    # my code here

    client.download(id_identifica_foto_dir_hdfs,  id_identifica_foto_dir_local, True, n_threads=2)

    # my code here

    return jsonReturn

@app.post("/")
async def main(request: Request, hdfsclient: InsecureClient = Depends(connectHDFS)):

    # Decode the received message
    data = await request.json()
    message = base64.b64decode(data['data']).decode('utf-8').replace("'", '"')
    message = json.loads(message)

    res = await run_in_process(fr, message['id'], message['img'], hdfsclient)

    return {
        "message": res
    }

@app.on_event("startup")
async def on_startup():
    app.state.executor = ProcessPoolExecutor()

@app.on_event("shutdown")
async def on_shutdown():
    app.state.executor.shutdown()

但我无法通过考试 hdfsclient 对象:

res = await run_in_process(fr, message['id'], message['img'], hdfsclient)

我得到以下错误:

Traceback (most recent call last):
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/uvicorn/protocols/http/h11_impl.py", line 396, in run_asgi
    result = await app(self.scope, self.receive, self.send)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
    return await self.app(scope, receive, send)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/fastapi/applications.py", line 199, in __call__
    await super().__call__(scope, receive, send)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/applications.py", line 111, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/middleware/errors.py", line 181, in __call__
    raise exc from None
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/middleware/errors.py", line 159, in __call__
    await self.app(scope, receive, _send)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/exceptions.py", line 82, in __call__
    raise exc from None
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/exceptions.py", line 71, in __call__
    await self.app(scope, receive, sender)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/routing.py", line 566, in __call__
    await route.handle(scope, receive, send)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/routing.py", line 227, in handle
    await self.app(scope, receive, send)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/starlette/routing.py", line 41, in app
    response = await func(request)
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/fastapi/routing.py", line 202, in app
    dependant=dependant, values=values, is_coroutine=is_coroutine
  File "/home/kleyson/.virtualenvs/reconhecimentofacial/lib/python3.7/site-packages/fastapi/routing.py", line 148, in run_endpoint_function
    return await dependant.call(**values)
  File "./asgi.py", line 86, in main
    res = await run_in_process(fr, message['id'], message['img'], hdfsclient)
  File "./asgi.py", line 22, in run_in_process
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result
  File "/usr/lib/python3.7/multiprocessing/queues.py", line 236, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects

怎么能有 hdfsclient 可在 def fr() 无需在每个新请求上创建新连接即可运行?我是说,如何创造 hdfsclient 在应用程序启动时,是否能够在函数内部使用它?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题