在Python中实现从Azure Redis缓存中并发提取数据

zu0ti5jz  于 2022-10-31  发布在  Redis
关注(0)|答案(2)|浏览(158)

我目前正在使用快速API构建低延迟模型推理API,我们使用Azure Redis缓存标准版本获取特性,使用onnx模型进行快速模型推理。我正在使用aioredis实现Redis中数据读取的并发性。我正在调用Redis的两个特性请求,一个用于获取单个字符串的userID,另一个用于获取字符串列表的product。稍后我使用json解析将其转换为float列表。
对于一个请求,总的来说需要70- 80毫秒,但对于10个以上的并发请求,redis需要400毫秒以上来获取结果,这是巨大的,并可能在负载测试时随着并发用户的增加而线性增加。
从redis获取数据的代码是:

import numpy as np
import json
from ..Helpers.helper import curt_giver, milsec_calc
import aioredis
r = aioredis.from_url("redis://user:host",decode_responses=True)

async def get_user(user:list) -> str:
    user_data = await r.get(user)
    return user_data
async def get_products(product:list)-> list:
    product_data = await r.mget(product)
    return product_data

async def get_features(inputs: dict) -> list:

    st = curt_giver()
    user_data = await get_user(inputs['userId'])
    online_user_data = [json.loads(json.loads(user_data))]
    end = curt_giver()
    print("Time to get user features: ", milsec_calc(st,end))

    st = curt_giver()
    product_data = await get_products(inputs['productIds'])
    online_product_data = []
    for i in product_data:
        online_product_data.append(json.loads(json.loads(i)))
    end = curt_giver()
    print("Time to get product features: ", milsec_calc(st,end))

    user_outputs = np.asarray(online_user_data,dtype=object)
    product_outputs = np.asarray(online_product_data,dtype=object)
    output = np.concatenate([np.concatenate([user_outputs]*product_outputs.shape[0])
    ,product_outputs],axis = 1)
    return output.tolist()

curt_giver()是以毫秒为单位的时间。主文件中的代码是:

from fastapi import FastAPI
    from v1.redis_conn.get_features import get_features

    from model_scoring.score_onnx import score_features
    from v1.post_processing.sort_results import sort_results

    from v1.api_models.input_models import Ranking_Input
    from v1.api_models.output_models import Ranking_Output
    from v1.Helpers.helper import curt_giver, milsec_calc
    import numpy as np

    app = FastAPI()

    # Sending user and product ids through body, 
    # Hence a POST request is well suited for this, GET has unexpected behaviour
    @app.post("/predict", response_model = Ranking_Output)
    async def rank_products(inp_req: Ranking_Input):
      beg = curt_giver()
      reqids = inp_req.dict()
      st = curt_giver()
      features = await get_features(reqids)
      end = curt_giver()

      print("Total Redis duration ( user + products fetch): ", milsec_calc(st,end))

      data = np.asarray(features,dtype=np.float32,order=None)

      st = curt_giver()
      scores = score_features(data)
      end = curt_giver()

      print("ONNX model duration: ", milsec_calc(st,end))

      Ranking_results = sort_results(scores, list(reqids["productIds"]))
      end = curt_giver()
      print("Total time for API: ",milsec_calc(beg,end))
      resp_json = {"requestId": inp_req.requestId,
      "ranking": Ranking_results,
      "zipCode": inp_req.zipCode}

      return resp_json

通过计时,我可以看到,对于一个请求,它花费的时间非常少,但对于并发用户,获取产品数据的时间是线性增加的。获取一个请求的时间,所有值都以毫秒为单位:

Time to get user features:  1
Time to get product features:  47
Total Redis duration ( user + products fetch):  53
ONNX model duration:  2
Total time for API:  60

提取10个以上并发请求所需的时间:

Time to get user features:  151
Time to get user features:  150
Time to get user features:  151
Time to get user features:  52
Time to get user features:  51
Time to get product features:  187
Total Redis duration ( user + products fetch):  433
ONNX model duration:  2
Total time for API:  440
INFO:     127.0.0.1:60646 - "POST /predict HTTP/1.0" 200 OK
Time to get product features:  239
Total Redis duration ( user + products fetch):  488
ONNX model duration:  2
Total time for API:  495
INFO:     127.0.0.1:60644 - "POST /predict HTTP/1.0" 200 OK
Time to get product features:  142
Total Redis duration ( user + products fetch):  297
ONNX model duration:  2
Total time for API:  303
INFO:     127.0.0.1:60648 - "POST /predict HTTP/1.0" 200 OK
Time to get product features:  188
Total Redis duration ( user + products fetch):  342
ONNX model duration:  2
Total time for API:  348

它继续增加,甚至达到900毫秒以上从redis提取两个数据,有没有什么方法可以有效地提取并发数据与低延迟和增加并发请求,如500,而不影响延迟,我的目标是在300毫秒以下,每秒300个并发请求。
在我被困的这一点上任何帮助,我都将感激不尽。

vvppvyoh

vvppvyoh1#

看起来你的一些代码阻塞了。看看你的日志,它开始是异步的(不是并发的,这里没有发生)。但是它一个接一个地处理所有的调用。
查看您的代码,在product_data = await get_products(inputs['productIds'])行之后,它永远不会将控制权返回给事件循环。
如果之后的代码花费了很长时间,则所有其他请求都在等待执行(并将按顺序执行)。我们缺少一些代码(这不是一个MRE),所以很难说出到底发生了什么。例如,我们不知道日志Total Redis duration ( user + products fetch):ONNX model duration:在代码中的什么位置生成,并且您正在使用从未初始化过的变量(例如online_product_data)。
底线是;如果你想在FastAPI中获得更高的并发性,你需要更多的进程来运行你的代码,这意味着更多的Uvicorn工作线程,或者一些负载平衡器和更多的Uvicorn示例(假设您使用的是Uvicorn)。否则,请尝试查找任何可能变为非阻塞的阻塞IO。(不是IO密集型的),所以增加处理请求的Python进程的数量将是最好的选择。

qxgroojn

qxgroojn2#

您的代码是正确的。您应该在处理并行请求时观察协作增益。
为了清楚起见,请在没有JSON解析的情况下测量时间,概要文件中只有这一行:

user_data = await get_user(inputs['userId'])

请检查redis配置中的THREAD_COUNT。如果要测量10个并行请求的性能,请至少将其设置为10。
如果它没有帮助,那么给予另一个库,如aiocache(不幸的是,我不熟悉aioredis)。
请在更改后发布您的时间和结果。

相关问题