我目前正在使用快速API构建低延迟模型推理API,我们使用Azure Redis缓存标准版本获取特性,使用onnx模型进行快速模型推理。我正在使用aioredis实现Redis中数据读取的并发性。我正在调用Redis的两个特性请求,一个用于获取单个字符串的userID,另一个用于获取字符串列表的product。稍后我使用json解析将其转换为float列表。
对于一个请求,总的来说需要70- 80毫秒,但对于10个以上的并发请求,redis需要400毫秒以上来获取结果,这是巨大的,并可能在负载测试时随着并发用户的增加而线性增加。
从redis获取数据的代码是:
import numpy as np
import json
from ..Helpers.helper import curt_giver, milsec_calc
import aioredis
r = aioredis.from_url("redis://user:host",decode_responses=True)
async def get_user(user:list) -> str:
user_data = await r.get(user)
return user_data
async def get_products(product:list)-> list:
product_data = await r.mget(product)
return product_data
async def get_features(inputs: dict) -> list:
st = curt_giver()
user_data = await get_user(inputs['userId'])
online_user_data = [json.loads(json.loads(user_data))]
end = curt_giver()
print("Time to get user features: ", milsec_calc(st,end))
st = curt_giver()
product_data = await get_products(inputs['productIds'])
online_product_data = []
for i in product_data:
online_product_data.append(json.loads(json.loads(i)))
end = curt_giver()
print("Time to get product features: ", milsec_calc(st,end))
user_outputs = np.asarray(online_user_data,dtype=object)
product_outputs = np.asarray(online_product_data,dtype=object)
output = np.concatenate([np.concatenate([user_outputs]*product_outputs.shape[0])
,product_outputs],axis = 1)
return output.tolist()
curt_giver()是以毫秒为单位的时间。主文件中的代码是:
from fastapi import FastAPI
from v1.redis_conn.get_features import get_features
from model_scoring.score_onnx import score_features
from v1.post_processing.sort_results import sort_results
from v1.api_models.input_models import Ranking_Input
from v1.api_models.output_models import Ranking_Output
from v1.Helpers.helper import curt_giver, milsec_calc
import numpy as np
app = FastAPI()
# Sending user and product ids through body,
# Hence a POST request is well suited for this, GET has unexpected behaviour
@app.post("/predict", response_model = Ranking_Output)
async def rank_products(inp_req: Ranking_Input):
beg = curt_giver()
reqids = inp_req.dict()
st = curt_giver()
features = await get_features(reqids)
end = curt_giver()
print("Total Redis duration ( user + products fetch): ", milsec_calc(st,end))
data = np.asarray(features,dtype=np.float32,order=None)
st = curt_giver()
scores = score_features(data)
end = curt_giver()
print("ONNX model duration: ", milsec_calc(st,end))
Ranking_results = sort_results(scores, list(reqids["productIds"]))
end = curt_giver()
print("Total time for API: ",milsec_calc(beg,end))
resp_json = {"requestId": inp_req.requestId,
"ranking": Ranking_results,
"zipCode": inp_req.zipCode}
return resp_json
通过计时,我可以看到,对于一个请求,它花费的时间非常少,但对于并发用户,获取产品数据的时间是线性增加的。获取一个请求的时间,所有值都以毫秒为单位:
Time to get user features: 1
Time to get product features: 47
Total Redis duration ( user + products fetch): 53
ONNX model duration: 2
Total time for API: 60
提取10个以上并发请求所需的时间:
Time to get user features: 151
Time to get user features: 150
Time to get user features: 151
Time to get user features: 52
Time to get user features: 51
Time to get product features: 187
Total Redis duration ( user + products fetch): 433
ONNX model duration: 2
Total time for API: 440
INFO: 127.0.0.1:60646 - "POST /predict HTTP/1.0" 200 OK
Time to get product features: 239
Total Redis duration ( user + products fetch): 488
ONNX model duration: 2
Total time for API: 495
INFO: 127.0.0.1:60644 - "POST /predict HTTP/1.0" 200 OK
Time to get product features: 142
Total Redis duration ( user + products fetch): 297
ONNX model duration: 2
Total time for API: 303
INFO: 127.0.0.1:60648 - "POST /predict HTTP/1.0" 200 OK
Time to get product features: 188
Total Redis duration ( user + products fetch): 342
ONNX model duration: 2
Total time for API: 348
它继续增加,甚至达到900毫秒以上从redis提取两个数据,有没有什么方法可以有效地提取并发数据与低延迟和增加并发请求,如500,而不影响延迟,我的目标是在300毫秒以下,每秒300个并发请求。
在我被困的这一点上任何帮助,我都将感激不尽。
2条答案
按热度按时间vvppvyoh1#
看起来你的一些代码阻塞了。看看你的日志,它开始是异步的(不是并发的,这里没有发生)。但是它一个接一个地处理所有的调用。
查看您的代码,在
product_data = await get_products(inputs['productIds'])
行之后,它永远不会将控制权返回给事件循环。如果之后的代码花费了很长时间,则所有其他请求都在等待执行(并将按顺序执行)。我们缺少一些代码(这不是一个MRE),所以很难说出到底发生了什么。例如,我们不知道日志
Total Redis duration ( user + products fetch):
和ONNX model duration:
在代码中的什么位置生成,并且您正在使用从未初始化过的变量(例如online_product_data
)。底线是;如果你想在FastAPI中获得更高的并发性,你需要更多的进程来运行你的代码,这意味着更多的Uvicorn工作线程,或者一些负载平衡器和更多的Uvicorn示例(假设您使用的是Uvicorn)。否则,请尝试查找任何可能变为非阻塞的阻塞IO。(不是IO密集型的),所以增加处理请求的Python进程的数量将是最好的选择。
qxgroojn2#
您的代码是正确的。您应该在处理并行请求时观察协作增益。
为了清楚起见,请在没有JSON解析的情况下测量时间,概要文件中只有这一行:
请检查redis配置中的THREAD_COUNT。如果要测量10个并行请求的性能,请至少将其设置为10。
如果它没有帮助,那么给予另一个库,如
aiocache
(不幸的是,我不熟悉aioredis
)。请在更改后发布您的时间和结果。